2022-04-05 05:59:23 +00:00
/***
Copyright ( C ) 2022 J Reece Wilson ( a / k / a " Reece " ) . All rights reserved .
File : LoopQueue . Linux . cpp
Date : 2022 - 4 - 5
Author : Reece
* * */
# include <Source/RuntimeInternal.hpp>
# include "Loop.NT.hpp"
# include "ILoopSourceEx.hpp"
# include "LoopQueue.Linux.hpp"
2022-04-07 01:20:46 +00:00
# include <sys/epoll.h>
2022-04-09 15:53:14 +00:00
# include <Source/Time/Time.hpp>
2022-04-13 11:00:35 +00:00
# include <Source/IO/UNIX/IOSubmit.Linux.hpp>
2022-04-05 05:59:23 +00:00
2023-08-27 11:41:51 +00:00
# if defined(AURORA_COMPILER_CLANG)
// warning: ISO C++20 considers use of overloaded operator '!=' (with operand types 'AuSPtr<Aurora::IO::Loop::ILoopSource>' (aka 'ExSharedPtr<Aurora::IO::Loop::ILoopSource, std::shared_ptr<ILoopSource>>') and 'typename tuple_element<0UL, tuple<ExSharedPtr<ILoopSource, shared_ptr<ILoopSource>>, ExSharedPtr<ILoopSourceSubscriber, shared_ptr<ILoopSourceSubscriber>>, ExSharedPtr<ILoopSourceSubscriberEx, shared_ptr<ILoopSourceSubscriberEx>>>>::type' (aka '__type_pack_element<0UL, Aurora::Memory::ExSharedPtr<Aurora::IO::Loop::ILoopSource, std::shared_ptr<Aurora::IO::Loop::ILoopSource>>, Aurora::Memory::ExSharedPtr<Aurora::IO::Loop::ILoopSourceSubscriber, std::shared_ptr<Aurora::IO::Loop::ILoopSourceSubscriber>>, Aurora::Memory::ExSharedPtr<Aurora::IO::Loop::ILoopSourceSubscriberEx, std::shared_ptr<Aurora::IO::Loop::ILoopSourceSubscriberEx>>>')) to be ambiguous despite there being a unique best viable function with non-reversed arguments [-Wambiguous-reversed-operator]
# pragma clang diagnostic ignored "-Wambiguous-reversed-operator"
// Yea, I couldn't give less of a nanoshit what some C++20 spec says. Even llvm/clang doesn't care to language police it into a fatal unimplemented compiler condition. So, idc.
# endif
2022-06-11 23:52:46 +00:00
namespace Aurora : : IO : : Loop
2022-04-05 05:59:23 +00:00
{
// On Linux, Loop Queues are glorified eventfd to epoll adapters.
// Requeuing the cached fd array per frame on the TLS io_submit object
// would be more costly than maintaining an epoll for all fds tracked
// by the loop queue.
// We can delegate the wait functions to an NT overlapped like shim
// where all eventfds are one epoll handle
// The TLS worker would get awoken by any changes in the epoll queue
// or if the io submit object should preemptively abort
// The TLS worker would remain resposible for scheduling thread local
// network and file transactions independent from the loop queues
// As such, loop queues continue to be defined as a mechanism to merely
// wait, not dispatch/manage work
2022-04-05 10:11:19 +00:00
// Delegating mutex reads to a single io_submit would be a linux-specific
// kevent-non-reusable ThreadWorkerQueueShim hack
// ...it wouldn't make sense create another loop queue per thread concept
// outside of the async subsystem (not counting TLS overlapped io)
2022-04-05 05:59:23 +00:00
2022-04-09 15:53:14 +00:00
LoopQueue : : LoopQueue ( ) : lockStealer_ ( false , false , true )
2022-04-05 05:59:23 +00:00
{
}
LoopQueue : : ~ LoopQueue ( )
{
2022-04-07 01:20:46 +00:00
Deinit ( ) ;
}
bool LoopQueue : : Init ( )
{
2022-12-16 03:48:04 +00:00
this - > epollFd_ = epoll_create1 ( EPOLL_CLOEXEC ) ;
2022-04-09 15:53:14 +00:00
if ( this - > epollFd_ = = - 1 )
{
return false ;
}
2022-04-07 01:20:46 +00:00
2022-04-09 15:53:14 +00:00
this - > sourceMutex_ = AuThreadPrimitives : : RWLockUnique ( ) ;
if ( ! this - > sourceMutex_ )
{
return false ;
}
this - > polledItemsMutex_ = AuThreadPrimitives : : RWLockUnique ( ) ;
if ( ! this - > polledItemsMutex_ )
{
return false ;
}
this - > globalEpoll_ . parent = this ;
return AuTryInsert ( this - > alternativeEpolls_ , & this - > globalEpoll_ ) ;
2022-04-07 01:20:46 +00:00
}
2022-04-05 05:59:23 +00:00
2022-04-07 01:20:46 +00:00
void LoopQueue : : Deinit ( )
{
2022-04-09 15:53:14 +00:00
int fd ;
if ( ( fd = AuExchange ( this - > epollFd_ , - 1 ) ) ! = - 1 )
{
: : close ( fd ) ;
}
}
void LoopQueue : : AnEpoll : : Add ( SourceExtended * source )
{
epoll_event event ;
auto ex = source - > sourceExtended ;
if ( ! ex )
{
return ;
}
event . data . ptr = source ;
if ( ex - > Singular ( ) )
{
bool bDouble { } ;
int oldReadRef { } ;
int oldWriteRef { } ;
auto read = ex - > GetHandle ( ) ;
if ( read ! = - 1 )
{
oldReadRef = startingWorkRead [ read ] + + ;
bDouble | = startingWorkWrite . find ( read ) ! = startingWorkWrite . end ( ) ;
}
auto write = ex - > GetWriteHandle ( ) ;
if ( write ! = - 1 )
{
oldWriteRef = startingWorkWrite [ write ] + + ;
bDouble | = startingWorkRead . find ( write ) ! = startingWorkRead . end ( ) ;
}
if ( bDouble )
{
epoll_event event ;
event . events = EPOLLOUT | EPOLLIN ;
event . data . ptr = source ;
if ( ( oldReadRef = = 0 ) & & ( oldWriteRef = = 0 ) )
{
epoll_ctl ( this - > parent - > epollFd_ , EPOLL_CTL_ADD , write , & event ) ;
}
else
{
epoll_ctl ( this - > parent - > epollFd_ , EPOLL_CTL_MOD , write , & event ) ;
}
}
if ( ( write ! = - 1 ) & & ( ! oldWriteRef ) )
{
event . events = EPOLLOUT ;
epoll_ctl ( this - > parent - > epollFd_ , EPOLL_CTL_ADD , write , & event ) ;
}
if ( ( read ! = - 1 ) & & ( ! oldReadRef ) )
{
event . events = EPOLLIN ;
epoll_ctl ( this - > parent - > epollFd_ , EPOLL_CTL_ADD , read , & event ) ;
}
}
else
{
auto read = ex - > GetHandles ( ) ;
auto write = ex - > GetWriteHandles ( ) ;
for ( auto readHandle : read )
{
auto count = startingWorkRead [ readHandle ] + + ;
if ( count )
{
continue ;
}
if ( AuExists ( write , readHandle ) )
{
continue ;
}
event . events = EPOLLIN ;
epoll_ctl ( this - > parent - > epollFd_ , EPOLL_CTL_ADD , readHandle , & event ) ;
}
for ( auto writeHandle : write )
{
auto count = startingWorkWrite [ writeHandle ] + + ;
if ( count )
{
if ( AuExists ( read , writeHandle ) )
{
event . events = EPOLLOUT | EPOLLIN ;
epoll_ctl ( this - > parent - > epollFd_ , EPOLL_CTL_MOD , writeHandle , & event ) ;
}
continue ;
}
if ( AuExists ( read , writeHandle ) )
{
event . events = EPOLLOUT | EPOLLIN ;
epoll_ctl ( this - > parent - > epollFd_ , EPOLL_CTL_ADD , writeHandle , & event ) ;
}
else
{
event . events = EPOLLOUT ;
epoll_ctl ( this - > parent - > epollFd_ , EPOLL_CTL_ADD , writeHandle , & event ) ;
}
}
}
2022-04-05 05:59:23 +00:00
}
bool LoopQueue : : SourceAdd ( const AuSPtr < ILoopSource > & source )
{
2022-04-09 15:53:14 +00:00
return SourceAddWithTimeout ( source , 0 ) ;
2022-04-05 05:59:23 +00:00
}
bool LoopQueue : : SourceAddWithTimeout ( const AuSPtr < ILoopSource > & source , AuUInt32 ms )
2022-08-28 19:02:06 +00:00
{
AU_LOCK_GUARD ( this - > commitQueueMutex_ ) ;
return this - > SourceAddWithTimeoutEx ( source , ms ) ;
}
bool LoopQueue : : SourceAddWithTimeoutEx ( const AuSPtr < ILoopSource > & source , AuUInt32 ms )
2022-04-05 05:59:23 +00:00
{
2022-04-09 15:53:14 +00:00
this - > lockStealer_ . Set ( ) ;
2022-08-28 19:02:06 +00:00
#if 0
2022-04-09 15:53:14 +00:00
AU_LOCK_GUARD ( this - > sourceMutex_ - > AsWritable ( ) ) ;
2022-08-28 19:02:06 +00:00
# else
auto pWaitable = this - > sourceMutex_ - > AsWritable ( ) ;
auto pLocked = pWaitable - > TryLock ( ) ;
if ( pLocked )
# endif
2022-04-09 15:53:14 +00:00
{
2022-08-28 19:02:06 +00:00
this - > lockStealer_ . Reset ( ) ;
2022-04-09 15:53:14 +00:00
2022-08-28 19:02:06 +00:00
auto src = AuMakeShared < SourceExtended > ( this , source ) ;
if ( ! src )
{
pWaitable - > Unlock ( ) ;
return false ;
}
2022-04-09 15:53:14 +00:00
2022-08-28 19:02:06 +00:00
if ( ms )
{
2022-12-16 00:41:01 +00:00
src - > timeoutAbs = ( AuUInt64 ) ms + AuTime : : SteadyClockMS ( ) ;
2022-08-28 19:02:06 +00:00
}
if ( ! AuTryInsert ( this - > sources_ , src ) )
{
pWaitable - > Unlock ( ) ;
return false ;
}
this - > globalEpoll_ . Add ( src . get ( ) ) ;
pWaitable - > Unlock ( ) ;
return true ;
}
# if 1
else
2022-04-09 15:53:14 +00:00
{
2022-08-28 19:02:06 +00:00
return AuTryInsert ( this - > pendingBlocking_ , AuMakePair ( source , ms ) ) ;
2022-04-09 15:53:14 +00:00
}
2022-08-28 19:02:06 +00:00
# endif
2022-04-05 05:59:23 +00:00
}
bool LoopQueue : : SourceRemove ( const AuSPtr < ILoopSource > & source )
{
2022-04-09 15:53:14 +00:00
AU_LOCK_GUARD ( this - > commitQueueMutex_ ) ;
return AuTryInsert ( this - > decommitQueue_ , source ) ;
2022-04-05 05:59:23 +00:00
}
AuUInt32 LoopQueue : : GetSourceCount ( )
{
2022-04-07 01:20:46 +00:00
return this - > sources_ . size ( ) ;
2022-04-05 05:59:23 +00:00
}
bool LoopQueue : : AddCallback ( const AuSPtr < ILoopSource > & source , const AuSPtr < ILoopSourceSubscriber > & subscriber )
{
2022-04-07 01:20:46 +00:00
AU_LOCK_GUARD ( this - > commitQueueMutex_ ) ;
2022-04-09 15:53:14 +00:00
return AuTryInsert ( this - > commitPending_ , AuMakeTuple ( source , subscriber , AuSPtr < ILoopSourceSubscriberEx > { } ) ) ;
2022-04-05 05:59:23 +00:00
}
bool LoopQueue : : AddCallbackEx ( const AuSPtr < ILoopSource > & source , const AuSPtr < ILoopSourceSubscriberEx > & subscriber )
{
2022-04-07 01:20:46 +00:00
AU_LOCK_GUARD ( this - > commitQueueMutex_ ) ;
2022-04-09 15:53:14 +00:00
return AuTryInsert ( this - > commitPending_ , AuMakeTuple ( source , AuSPtr < ILoopSourceSubscriber > { } , subscriber ) ) ;
2022-04-05 05:59:23 +00:00
}
bool LoopQueue : : AddCallback ( const AuSPtr < ILoopSourceSubscriber > & subscriber )
{
2022-04-07 01:20:46 +00:00
AU_LOCK_GUARD ( this - > globalLockMutex_ ) ;
return AuTryInsert ( this - > allSubscribers_ , subscriber ) ;
2022-04-05 05:59:23 +00:00
}
void LoopQueue : : ChugPathConfigure ( AuUInt32 sectionTickTime , AuSInt sectionDequeCount )
{
2022-04-07 01:20:46 +00:00
// Intentionally NO-OP under Linux
2022-04-05 05:59:23 +00:00
}
void LoopQueue : : ChugHint ( bool value )
{
2022-04-07 01:20:46 +00:00
// Intentionally NO-OP under Linux
2022-04-05 05:59:23 +00:00
}
2022-04-09 15:53:14 +00:00
bool LoopQueue : : CommitDecommit ( )
{
AuUInt32 dwSuccess { } ;
if ( this - > decommitQueue_ . empty ( ) )
{
return true ;
}
auto decommitQueue = AuExchange ( this - > decommitQueue_ , { } ) ;
for ( auto sourceExtended : sources_ )
{
bool bFound { } ;
for ( auto decommit : decommitQueue )
{
if ( decommit = = sourceExtended - > source )
{
bFound = true ;
break ;
}
}
if ( ! bFound )
{
continue ;
}
AU_LOCK_GUARD ( this - > polledItemsMutex_ - > AsReadable ( ) ) ;
for ( auto epoll : this - > alternativeEpolls_ )
{
epoll - > Remove ( sourceExtended . get ( ) , true , true ) ;
}
dwSuccess + + ;
}
2022-08-02 04:52:17 +00:00
// TODO (Reece): Urgent. Fails under an IO update dtor. Faking perfect unit tests until i make it. Need linux aurt.
//SysAssertDbg(dwSuccess == decommitQueue.size(), "caught SourceRemove on invalid");
2022-04-09 15:53:14 +00:00
return dwSuccess ;
}
2022-04-05 05:59:23 +00:00
bool LoopQueue : : Commit ( )
{
2022-04-09 15:53:14 +00:00
AU_LOCK_GUARD ( this - > commitQueueMutex_ ) ;
2022-08-28 19:02:06 +00:00
for ( const auto & [ pSource , ms ] : AuExchange ( this - > pendingBlocking_ , { } ) )
2022-04-09 15:53:14 +00:00
{
2022-08-28 19:02:06 +00:00
this - > SourceAddWithTimeoutEx ( pSource , ms ) ;
2022-04-09 15:53:14 +00:00
}
2022-08-28 19:02:06 +00:00
this - > lockStealer_ . Set ( ) ;
auto pWritable = this - > sourceMutex_ - > AsWritable ( ) ;
if ( pWritable - > TryLock ( ) )
2022-04-09 15:53:14 +00:00
{
2022-08-28 19:02:06 +00:00
this - > lockStealer_ . Reset ( ) ;
if ( ! CommitDecommit ( ) )
2022-04-09 15:53:14 +00:00
{
2022-08-28 19:02:06 +00:00
//pWritable->Unlock();
//return false;
}
auto pending = AuExchange ( this - > commitPending_ , { } ) ;
2022-04-09 15:53:14 +00:00
2022-08-28 19:02:06 +00:00
for ( auto & source : this - > sources_ )
{
for ( auto itr = pending . begin ( ) ; itr ! = pending . end ( ) ; )
2022-04-09 15:53:14 +00:00
{
2022-08-28 19:02:06 +00:00
if ( source - > source ! = AuGet < 0 > ( * itr ) )
2022-04-09 15:53:14 +00:00
{
2022-08-28 19:02:06 +00:00
itr + + ;
continue ;
2022-04-09 15:53:14 +00:00
}
2022-08-28 19:02:06 +00:00
auto a = AuGet < 1 > ( * itr ) ;
if ( a )
2022-04-09 15:53:14 +00:00
{
2022-08-28 19:02:06 +00:00
if ( ! AuTryInsert ( source - > subscribers , a ) )
{
this - > commitPending_ = AuMove ( this - > commitPending_ ) ;
pWritable - > Unlock ( ) ;
return false ;
}
2022-04-09 15:53:14 +00:00
}
2022-08-28 19:02:06 +00:00
auto b = AuGet < 2 > ( * itr ) ;
if ( b )
{
if ( ! AuTryInsert ( source - > subscriberExs , b ) )
{
// 1 and 2 are mutually exclusive, dont worry about clean up
this - > commitPending_ = AuMove ( this - > commitPending_ ) ;
pWritable - > Unlock ( ) ;
return false ;
}
}
itr = pending . erase ( itr ) ;
}
source - > Commit ( source ) ;
2022-04-09 15:53:14 +00:00
}
2022-08-28 19:02:06 +00:00
pWritable - > Unlock ( ) ;
}
else
{
this - > bRecommitLater = true ;
2022-04-09 15:53:14 +00:00
}
2022-04-05 10:11:19 +00:00
return true ;
2022-04-05 05:59:23 +00:00
}
2022-04-10 15:40:49 +00:00
bool LoopQueue : : IsSignaledPeek ( )
2022-04-05 05:59:23 +00:00
{
2022-04-09 15:53:14 +00:00
fd_set readSet ;
struct timeval tv { } ;
FD_ZERO ( & readSet ) ;
FD_SET ( this - > epollFd_ , & readSet ) ;
auto active = select ( this - > epollFd_ + 1 , & readSet , NULL , NULL , & tv ) ;
if ( active = = - 1 )
{
// todo push error
return false ;
}
return active = = 1 ;
}
2022-04-12 19:16:49 +00:00
// This could be implemented more like a possible BSD implementation
// if we were to implement based on io_submit poll
2022-04-09 15:53:14 +00:00
bool LoopQueue : : WaitAll ( AuUInt32 timeoutIn )
{
AnEpoll epollReference ;
{
AU_LOCK_GUARD ( this - > globalEpoll_ . lock ) ;
epollReference = this - > globalEpoll_ ;
}
epollReference . lock = { } ;
AuUInt64 timeout { timeoutIn } ;
if ( timeout )
{
2022-12-16 00:41:01 +00:00
timeout + = AuTime : : SteadyClockMS ( ) ;
2022-04-09 15:53:14 +00:00
}
{
AU_LOCK_GUARD ( this - > polledItemsMutex_ - > AsWritable ( ) ) ;
AuTryInsert ( this - > alternativeEpolls_ , & epollReference ) ;
}
bool anythingLeft { } ;
bool bTimeout { } ;
do
{
anythingLeft = epollReference . startingWorkRead . size ( ) | | epollReference . startingWorkWrite . size ( ) ;
if ( ! anythingLeft ) return true ;
//WaitAny(0);
// [==========] 1 test from 1 test suite ran. (11100 ms total)
// ...and a turbojet
//bool bTryAgain {};
//DoTick(timeout, {}, &bTryAgain);
// ...and + ~10ms latency
//bool bTryAgain {};
//DoTick(AuMin(AuUInt64(AuTime::CurrentClockMS() + 4), timeout), {}, &bTryAgain);
// [----------] 1 test from Loop (11101 ms total)
// ...and no jet engine (+ lower latency than windows)
bool bTryAgain { } ;
DoTick ( timeout , { } , & bTryAgain ) ;
2022-08-09 06:48:29 +00:00
PumpHooks ( ) ;
2022-04-09 15:53:14 +00:00
// but this hack should apply to wait any as well, so i'm moving it to the DoTick function
anythingLeft = epollReference . startingWorkRead . size ( ) | | epollReference . startingWorkWrite . size ( ) ;
2022-12-16 00:41:01 +00:00
bTimeout = timeout ? AuTime : : SteadyClockMS ( ) > = timeout : false ;
2022-04-09 15:53:14 +00:00
} while ( anythingLeft & & ! bTimeout ) ;
{
AU_LOCK_GUARD ( this - > polledItemsMutex_ - > AsWritable ( ) ) ;
SysAssert ( AuTryRemove ( this - > alternativeEpolls_ , & epollReference ) ) ;
}
return ! anythingLeft ;
}
AuUInt32 LoopQueue : : WaitAny ( AuUInt32 timeoutIn )
{
AuUInt64 timeout = timeoutIn ;
if ( timeout )
{
2022-12-16 00:41:01 +00:00
timeout + = AuTime : : SteadyClockMS ( ) ;
2022-04-09 15:53:14 +00:00
}
AuUInt32 cTicked { } ;
bool bTryAgain { } ;
do
{
bTryAgain = false ;
AuUInt32 ticked = DoTick ( timeout , { } , & bTryAgain ) ;
2022-08-09 06:48:29 +00:00
PumpHooks ( ) ;
2022-04-09 15:53:14 +00:00
cTicked + = ticked ;
} while ( bTryAgain ) ;
return cTicked ;
}
2022-04-10 15:40:49 +00:00
AuUInt32 LoopQueue : : PumpNonblocking ( )
{
AuUInt32 cTicked { } ;
bool bTryAgain { } ;
do
{
bTryAgain = false ;
AuUInt32 ticked = DoTick ( 0 , { } , & bTryAgain , true ) ;
2022-08-09 06:48:29 +00:00
PumpHooks ( ) ;
2022-04-10 15:40:49 +00:00
cTicked + = ticked ;
} while ( bTryAgain ) ;
return cTicked ;
}
AuList < AuSPtr < ILoopSource > > LoopQueue : : PumpNonblockingEx ( )
{
AuList < AuSPtr < ILoopSource > > ret ;
bool bTryAgain { } ;
do
{
bTryAgain = false ;
AuUInt32 ticked = DoTick ( 0 , & ret , & bTryAgain , true ) ;
2022-08-09 06:48:29 +00:00
PumpHooks ( ) ;
2022-04-10 15:40:49 +00:00
} while ( bTryAgain ) ;
return ret ;
}
2022-04-09 15:53:14 +00:00
AuList < AuSPtr < ILoopSource > > LoopQueue : : WaitAnyEx ( AuUInt32 timeoutIn )
{
AuList < AuSPtr < ILoopSource > > ret ;
AuUInt64 timeout = timeoutIn ;
if ( timeout )
{
2022-12-16 00:41:01 +00:00
timeout + = AuTime : : SteadyClockMS ( ) ;
2022-04-09 15:53:14 +00:00
}
bool bTryAgain { } ;
do
{
bTryAgain = false ;
AuUInt32 ticked = DoTick ( timeout , & ret , & bTryAgain ) ;
2022-08-09 06:48:29 +00:00
PumpHooks ( ) ;
2022-04-09 15:53:14 +00:00
} while ( bTryAgain ) ;
return ret ;
2022-04-05 05:59:23 +00:00
}
2022-04-09 15:53:14 +00:00
void LoopQueue : : AnEpoll : : Remove ( SourceExtended * source , bool readData , bool writeData )
2022-04-05 05:59:23 +00:00
{
2022-04-09 15:53:14 +00:00
if ( ! source - > sourceExtended )
{
return ;
}
auto ex = source - > sourceExtended ;
AU_LOCK_GUARD ( this - > lock ) ;
bool bIsRoot = this = = & this - > parent - > globalEpoll_ ;
if ( readData )
{
for ( auto i = startingWorkRead . begin ( ) ; i ! = startingWorkRead . end ( ) ; )
{
bool doesntMatch { } ;
auto & fd = i - > first ;
auto & usage = i - > second ;
if ( ex - > Singular ( ) )
{
doesntMatch = fd ! = ex - > GetHandle ( ) ;
}
else
{
doesntMatch = ! AuExists ( ex - > GetHandles ( ) , fd ) ;
}
if ( doesntMatch )
{
i + + ;
continue ;
}
if ( ( - - ( usage ) ) ! = 0 )
{
i + + ;
continue ;
}
if ( bIsRoot )
{
if ( startingWorkWrite . find ( fd ) = = startingWorkWrite . end ( ) )
{
epoll_ctl ( this - > parent - > epollFd_ , EPOLL_CTL_DEL , fd , nullptr ) ;
}
else
{
epoll_event event ;
event . events = EPOLLOUT ;
event . data . ptr = source ;
epoll_ctl ( this - > parent - > epollFd_ , EPOLL_CTL_MOD , fd , & event ) ;
}
}
i = startingWorkRead . erase ( i ) ;
}
}
if ( writeData )
{
for ( auto i = startingWorkWrite . begin ( ) ; i ! = startingWorkWrite . end ( ) ; )
{
bool doesntMatch { } ;
auto & fd = i - > first ;
auto & usage = i - > second ;
if ( ex - > Singular ( ) )
{
doesntMatch = fd ! = ex - > GetWriteHandle ( ) ;
}
else
{
doesntMatch = ! AuExists ( ex - > GetWriteHandles ( ) , fd ) ;
}
if ( doesntMatch )
{
i + + ;
continue ;
}
if ( ( - - ( usage ) ) ! = 0 )
{
i + + ;
continue ;
}
if ( bIsRoot )
{
if ( startingWorkRead . find ( fd ) = = startingWorkRead . end ( ) )
{
epoll_ctl ( this - > parent - > epollFd_ , EPOLL_CTL_DEL , fd , nullptr ) ;
}
else
{
epoll_event event ;
event . events = EPOLLIN ;
event . data . ptr = source ;
epoll_ctl ( this - > parent - > epollFd_ , EPOLL_CTL_MOD , fd , & event ) ;
}
}
i = startingWorkWrite . erase ( i ) ;
}
}
2022-04-05 05:59:23 +00:00
}
2022-04-10 15:40:49 +00:00
AuUInt32 LoopQueue : : DoTick ( AuUInt64 time , AuList < AuSPtr < ILoopSource > > * optOut , bool * tryAgain , bool nonblock )
2022-04-05 05:59:23 +00:00
{
2022-04-09 15:53:14 +00:00
AuUInt32 bTicked { } ;
AuUInt64 now { } ;
epoll_event events [ 128 ] ;
AU_LOCK_GUARD ( this - > sourceMutex_ - > AsReadable ( ) ) ;
for ( const auto & source : this - > sources_ )
{
if ( source - > sourceExtended )
{
source - > sourceExtended - > OnPresleep ( ) ;
}
}
2022-04-10 15:40:49 +00:00
AuInt64 deltaMS = 0 ;
if ( time )
{
2022-12-16 00:41:01 +00:00
deltaMS = AuMin ( AuInt64 ( 4 ) , ( AuInt64 ) time - ( AuInt64 ) AuTime : : SteadyClockMS ( ) ) ;
2022-04-10 15:40:49 +00:00
if ( deltaMS < 0 )
{
deltaMS = 0 ;
}
}
else
{
deltaMS = nonblock ? 0 : - 1 ;
}
2022-04-09 15:53:14 +00:00
2022-04-13 11:00:35 +00:00
int iEvents = IO : : UNIX : : LinuxOverlappedEpollShim ( this - > epollFd_ , events , AuArraySize ( events ) , deltaMS ) ;
2022-04-09 15:53:14 +00:00
if ( iEvents = = - 1 )
{
goto out ;
}
for ( int i = 0 ; i < iEvents ; i + + )
{
bool readData = events [ i ] . events & EPOLLIN ;
bool writeData = events [ i ] . events & EPOLLOUT ;
auto handle = events [ i ] . data . ptr ;
if ( ! handle )
{
continue ;
}
2022-04-18 16:29:53 +00:00
auto base = AuReinterpretCast < SourceExtended * > ( handle ) ;
if ( ! base - > bHasCommited )
{
continue ;
}
auto source = base - > pin . lock ( ) ;
2022-04-09 15:53:14 +00:00
2022-04-17 22:46:05 +00:00
auto [ ticked , remove , noworkers ] = source - > DoWork ( readData , writeData ) ;
2022-04-09 15:53:14 +00:00
bTicked + = ticked ;
if ( ticked )
{
if ( optOut )
{
optOut - > push_back ( source - > source ) ;
}
}
if ( remove )
{
this - > sourceMutex_ - > UpgradeReadToWrite ( 0 ) ;
AuTryRemove ( this - > sources_ , source ) ;
this - > sourceMutex_ - > DowngradeWriteToRead ( ) ;
}
if ( remove )
{
AU_LOCK_GUARD ( this - > polledItemsMutex_ - > AsReadable ( ) ) ;
for ( auto epoll : this - > alternativeEpolls_ )
{
epoll - > Remove ( source . get ( ) , readData , writeData ) ;
}
}
2022-04-17 22:46:05 +00:00
// Fire waitall
// not sure i like how this fires all anys and alls.
// this isnt consistent
if ( noworkers )
{
AU_LOCK_GUARD ( this - > polledItemsMutex_ - > AsReadable ( ) ) ;
for ( auto epoll : this - > alternativeEpolls_ )
{
if ( epoll ! = & this - > globalEpoll_ )
{
epoll - > Remove ( source . get ( ) , readData , writeData ) ;
}
}
}
2022-04-09 15:53:14 +00:00
}
2022-12-16 00:41:01 +00:00
now = AuTime : : SteadyClockMS ( ) ;
2022-04-09 15:53:14 +00:00
if ( ! bTicked )
{
if ( tryAgain )
{
* tryAgain = ( ( this - > lockStealer_ . IsSignaled ( ) ) | |
( now < time ) ) ;
}
}
out :
if ( ! now )
{
2022-12-16 00:41:01 +00:00
now = AuTime : : SteadyClockMS ( ) ;
2022-04-09 15:53:14 +00:00
}
for ( auto itr = this - > sources_ . begin ( ) ; itr ! = this - > sources_ . end ( ) ; )
{
AuSPtr < SourceExtended > source = * itr ;
bool remove { } ;
if ( ! remove )
{
remove = source - > ConsiderTimeout ( now ) ;
}
if ( remove )
{
this - > sourceMutex_ - > UpgradeReadToWrite ( 0 ) ;
itr = this - > sources_ . erase ( itr ) ;
this - > sourceMutex_ - > DowngradeWriteToRead ( ) ;
}
if ( remove )
{
AU_LOCK_GUARD ( this - > polledItemsMutex_ - > AsReadable ( ) ) ;
for ( auto epoll : this - > alternativeEpolls_ )
{
epoll - > Remove ( source . get ( ) , true , true ) ;
}
}
if ( source - > sourceExtended )
{
source - > sourceExtended - > OnFinishSleep ( ) ;
}
if ( ! remove )
{
itr + + ;
}
}
2022-08-09 06:48:29 +00:00
2022-04-09 15:53:14 +00:00
return bTicked ;
}
LoopQueue : : SourceExtended : : SourceExtended ( LoopQueue * parent , const AuSPtr < ILoopSource > & source ) :
parent ( parent ) ,
source ( source )
{
this - > sourceExtended = AuDynamicCast < ILoopSourceEx > ( source . get ( ) ) ;
}
LoopQueue : : SourceExtended : : ~ SourceExtended ( )
{
Deinit ( ) ;
2022-04-05 05:59:23 +00:00
}
2022-04-09 15:53:14 +00:00
void LoopQueue : : SourceExtended : : Deinit ( )
2022-04-05 05:59:23 +00:00
{
2022-04-09 15:53:14 +00:00
this - > pin . reset ( ) ;
}
void LoopQueue : : SourceExtended : : Commit ( const AuSPtr < SourceExtended > & self )
{
this - > pin = self ;
this - > bHasCommited = true ;
}
2022-04-17 22:46:05 +00:00
AuTuple < bool , bool , bool > LoopQueue : : SourceExtended : : DoWork ( bool read , bool write )
2022-04-09 15:53:14 +00:00
{
if ( ! this - > sourceExtended )
{
return DoWork ( - 1 ) ;
}
if ( this - > sourceExtended - > Singular ( ) )
{
AuPair < bool , bool > ret ;
2022-04-17 22:46:05 +00:00
bool bSingleOnlyFlag { true } ;
2022-04-09 15:53:14 +00:00
if ( read )
{
2022-04-17 22:46:05 +00:00
auto [ a , b , c ] = DoWork ( this - > sourceExtended - > GetHandle ( ) ) ;
2022-04-09 15:53:14 +00:00
ret . first | = a ;
ret . second | = b ;
2022-04-17 22:46:05 +00:00
bSingleOnlyFlag & = c ;
2022-04-09 15:53:14 +00:00
}
if ( write )
{
2022-04-17 22:46:05 +00:00
auto [ a , b , c ] = DoWork ( this - > sourceExtended - > GetWriteHandle ( ) ) ;
2022-04-09 15:53:14 +00:00
ret . first | = a ;
ret . second | = b ;
2022-04-17 22:46:05 +00:00
bSingleOnlyFlag & = c ;
2022-04-09 15:53:14 +00:00
}
2022-04-17 22:46:05 +00:00
return AuMakeTuple ( AuGet < 0 > ( ret ) , AuGet < 1 > ( ret ) , bSingleOnlyFlag ) ;
2022-04-09 15:53:14 +00:00
}
else
{
// Whatever, I doubt implementing this is worth the perf hit
return DoWork ( - 1 ) ;
}
2022-04-05 05:59:23 +00:00
}
2022-04-07 01:20:46 +00:00
2022-04-17 22:46:05 +00:00
AuTuple < bool , bool , bool > LoopQueue : : SourceExtended : : DoWork ( int fd )
2022-04-07 01:20:46 +00:00
{
2022-04-09 15:53:14 +00:00
bool bShouldRemove { true } ;
2022-04-15 11:45:47 +00:00
AuUInt8 uPosition { } ;
2022-04-09 15:53:14 +00:00
if ( ! this - > bHasCommited )
{
return { } ;
}
if ( this - > sourceExtended )
{
if ( ! this - > sourceExtended - > OnTrigger ( fd ) )
{
return { } ;
}
}
2022-04-15 11:45:47 +00:00
bool bOverload { } ;
if ( ( this - > subscribers . empty ( ) ) & &
( this - > subscriberExs . empty ( ) ) )
{
bOverload = true ;
}
// Notify callbacks...
for ( auto itr = this - > subscriberExs . begin ( ) ;
itr ! = this - > subscriberExs . end ( ) ; )
2022-04-09 15:53:14 +00:00
{
2022-04-15 11:45:47 +00:00
bool result ;
auto handler = * itr ;
2022-04-09 15:53:14 +00:00
try
{
2022-04-15 11:45:47 +00:00
result = handler - > OnFinished ( this - > source , uPosition + + ) ;
2022-04-09 15:53:14 +00:00
}
catch ( . . . )
{
SysPushErrorCatch ( ) ;
}
2022-04-15 11:45:47 +00:00
bShouldRemove & = result ;
if ( result )
{
itr = this - > subscriberExs . erase ( itr ) ;
}
else
{
itr + + ;
}
2022-04-09 15:53:14 +00:00
}
2022-04-07 01:20:46 +00:00
2022-04-15 11:45:47 +00:00
for ( auto itr = this - > subscribers . begin ( ) ;
itr ! = this - > subscribers . end ( ) ; )
2022-04-09 15:53:14 +00:00
{
2022-04-15 11:45:47 +00:00
bool result ;
auto handler = * itr ;
try
2022-04-09 15:53:14 +00:00
{
2022-04-15 11:45:47 +00:00
result = handler - > OnFinished ( this - > source ) ;
}
catch ( . . . )
{
SysPushErrorCatch ( ) ;
}
bShouldRemove & = result ;
if ( result )
{
itr = this - > subscribers . erase ( itr ) ;
}
else
{
itr + + ;
2022-04-09 15:53:14 +00:00
}
}
2022-04-15 11:45:47 +00:00
// Evict when subs count hit zero, not when sub count started off at zero
if ( bOverload )
{
bShouldRemove = false ;
}
// Notify global subscribers, allowing them to preempt removal
2022-04-16 13:01:33 +00:00
if ( bShouldRemove | | bOverload )
2022-04-09 15:53:14 +00:00
{
AU_LOCK_GUARD ( this - > parent - > globalLockMutex_ ) ;
for ( const auto & handler : this - > parent - > allSubscribers_ )
{
try
{
bShouldRemove & = handler - > OnFinished ( this - > source ) ;
}
catch ( . . . )
{
SysPushErrorCatch ( ) ;
}
}
}
2022-04-17 22:46:05 +00:00
return AuMakeTuple ( true , bShouldRemove , bOverload ) ;
2022-04-07 01:20:46 +00:00
}
2022-08-02 04:52:17 +00:00
bool LoopQueue : : AddHook ( const AuFunction < void ( ) > & func )
2022-07-06 02:40:09 +00:00
{
return AuTryInsert ( this - > epilogueHooks_ , func ) ;
}
2022-08-02 04:52:17 +00:00
void LoopQueue : : PumpHooks ( )
{
2022-08-28 19:02:06 +00:00
if ( AuExchange ( this - > bRecommitLater , false ) )
{
this - > Commit ( ) ;
}
2022-08-02 04:52:17 +00:00
auto c = AuExchange ( this - > epilogueHooks_ , { } ) ;
for ( auto & a : c )
{
a ( ) ;
}
}
2022-04-07 01:20:46 +00:00
AUKN_SYM AuSPtr < ILoopQueue > NewLoopQueue ( )
{
2022-04-09 15:53:14 +00:00
auto queue = AuMakeShared < LoopQueue > ( ) ;
if ( ! queue )
{
return { } ;
}
if ( ! queue - > Init ( ) )
{
return { } ;
}
return queue ;
2022-04-07 01:20:46 +00:00
}
2022-04-05 05:59:23 +00:00
}