From cc01b0557a7bbd56e921e07c80b02341a72856ba Mon Sep 17 00:00:00 2001 From: J Reece Wilson Date: Thu, 1 Aug 2024 12:55:08 +0100 Subject: [PATCH] [*] Improve ILoopQueue::WaitAll parity (been on the todo list for, idk, 3 years) --- Source/IO/Loop/LoopQueue.Linux.cpp | 213 ++++++++++++++++++++++++----- Source/IO/Loop/LoopQueue.Linux.hpp | 11 +- 2 files changed, 188 insertions(+), 36 deletions(-) diff --git a/Source/IO/Loop/LoopQueue.Linux.cpp b/Source/IO/Loop/LoopQueue.Linux.cpp index 203ea015..4bfc950e 100644 --- a/Source/IO/Loop/LoopQueue.Linux.cpp +++ b/Source/IO/Loop/LoopQueue.Linux.cpp @@ -21,6 +21,8 @@ namespace Aurora::IO::Loop { + void ResetLoopSourceFalseAlarm(const AuSPtr &pLoopSource); + // On Linux, Loop Queues are glorified eventfd to epoll adapters. // Requeuing the cached fd array per frame on the TLS io_submit object // would be more costly than maintaining an epoll for all fds tracked @@ -295,6 +297,8 @@ namespace Aurora::IO::Loop return true; } + AU_DEBUG_MEMCRUNCH; + auto decommitQueue = AuExchange(this->decommitQueue_, {}); for (auto sourceExtended : sources_) @@ -333,6 +337,8 @@ namespace Aurora::IO::Loop { AU_LOCK_GUARD(this->commitQueueMutex_); + AU_DEBUG_MEMCRUNCH; + for (const auto & [pSource, ms] : AuExchange(this->pendingBlocking_, {})) { this->SourceAddWithTimeoutEx(pSource, ms); @@ -368,7 +374,7 @@ namespace Aurora::IO::Loop if (!AuTryInsert(source->subscribers, a)) { this->commitPending_ = AuMove(this->commitPending_); - pWritable->Unlock(); + pWritable->Unlock(); return false; } } @@ -380,7 +386,7 @@ namespace Aurora::IO::Loop { // 1 and 2 are mutually exclusive, dont worry about clean up this->commitPending_ = AuMove(this->commitPending_); - pWritable->Unlock(); + pWritable->Unlock(); return false; } } @@ -422,12 +428,15 @@ namespace Aurora::IO::Loop // if we were to implement based on io_submit poll bool LoopQueue::WaitAll(AuUInt32 timeoutIn) { + AU_DEBUG_MEMCRUNCH; + AnEpoll epollReference; { AU_LOCK_GUARD(this->globalEpoll_.lock); epollReference = this->globalEpoll_; } epollReference.lock = {}; + epollReference.bAll = true; AuUInt64 timeout {timeoutIn}; if (timeout) @@ -462,7 +471,7 @@ namespace Aurora::IO::Loop bool bTryAgain {}; - DoTick(timeout, {}, &bTryAgain); + DoTick(timeout, {}, &bTryAgain, false, true, &epollReference); PumpHooks(); // but this hack should apply to wait any as well, so i'm moving it to the DoTick function @@ -475,6 +484,47 @@ namespace Aurora::IO::Loop SysAssert(AuTryRemove(this->alternativeEpolls_, &epollReference)); } + { + AU_LOCK_GUARD(this->sourceMutex_->AsWritable()); + for (auto & pSource : epollReference.done) + { + bool bReAdd { false }; + + if (anythingLeft) + { + bReAdd = true; + ResetLoopSourceFalseAlarm(pSource->source); + } + else + { + auto [ticked, remove, noworkers] = pSource->DoWorkLate(); + + if (!remove) + { + bReAdd = true; + } + } + + if (!bReAdd) + { + continue; + } + + { + AuTryInsert(this->sources_, AuConstReference(pSource)); + } + + { + + AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable()); + for (auto epoll : this->alternativeEpolls_) + { + epoll->Add(pSource.get()); + } + } + } + } + return !anythingLeft; } @@ -487,6 +537,8 @@ namespace Aurora::IO::Loop timeout += AuTime::SteadyClockMS(); } + AU_DEBUG_MEMCRUNCH; + AuUInt32 cTicked {}; bool bTryAgain {}; do @@ -661,7 +713,7 @@ namespace Aurora::IO::Loop } } - AuUInt32 LoopQueue::DoTick(AuUInt64 time, AuList> *optOut, bool *tryAgain, bool nonblock) + AuUInt32 LoopQueue::DoTick(AuUInt64 time, AuList> *optOut, bool *tryAgain, bool nonblock, bool all, AnEpoll *pAllEpoll) { AuUInt32 bTicked {}; AuUInt64 now {}; @@ -716,46 +768,85 @@ namespace Aurora::IO::Loop auto source = base->pin.lock(); - auto [ticked, remove, noworkers] = source->DoWork(readData, writeData); - bTicked += ticked; - - if (ticked) + if (all) { - if (optOut) + bool bTriggered = source->CanDispatchLayer(readData, writeData); + if (!bTriggered) { - optOut->push_back(source->source); + continue; } - } - if (remove) - { - this->sourceMutex_->UpgradeReadToWrite(0); - AuTryRemove(this->sources_, source); - this->sourceMutex_->DowngradeWriteToRead(); - } - - if (remove) - { - AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable()); - for (auto epoll : this->alternativeEpolls_) { - epoll->Remove(source.get(), readData, writeData); + this->sourceMutex_->UpgradeReadToWrite(0); + AuTryRemove(this->sources_, source); + this->sourceMutex_->DowngradeWriteToRead(); } - } - // Fire waitall - // not sure i like how this fires all anys and alls. - // this isnt consistent - if (noworkers) - { - AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable()); - for (auto epoll : this->alternativeEpolls_) { - if (epoll != &this->globalEpoll_) + AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable()); + for (auto epoll : this->alternativeEpolls_) { epoll->Remove(source.get(), readData, writeData); } } + + { + AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable()); + for (auto epoll : this->alternativeEpolls_) + { + if (epoll != &this->globalEpoll_) + { + epoll->Remove(source.get(), readData, writeData); + } + } + } + + bTicked += 1; + pAllEpoll->done.push_back(source); + } + else + { + auto [ticked, remove, noworkers] = source->DoWork(readData, writeData); + bTicked += ticked; + + if (ticked) + { + if (optOut) + { + optOut->push_back(source->source); + } + } + + if (remove) + { + this->sourceMutex_->UpgradeReadToWrite(0); + AuTryRemove(this->sources_, source); + this->sourceMutex_->DowngradeWriteToRead(); + } + + if (remove) + { + AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable()); + for (auto epoll : this->alternativeEpolls_) + { + epoll->Remove(source.get(), readData, writeData); + } + } + + // Fire waitall + // not sure i like how this fires all anys and alls. + // this isnt consistent + if (noworkers) + { + AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable()); + for (auto epoll : this->alternativeEpolls_) + { + if (epoll != &this->globalEpoll_) + { + epoll->Remove(source.get(), readData, writeData); + } + } + } } } @@ -879,11 +970,46 @@ namespace Aurora::IO::Loop } } - AuTuple LoopQueue::SourceExtended::DoWork(int fd) + bool LoopQueue::SourceExtended::CanDispatchLayer(bool read, bool write) { - bool bShouldRemove {true}; - AuUInt8 uPosition {}; + if (!this->sourceExtended) + { + return false; + } + if (this->sourceExtended->Singular()) + { + AuPair ret; + + bool bSingleOnlyFlag {true}; + + if (read) + { + if (CanDispatchLayer(this->sourceExtended->GetHandle())) + { + return true; + } + } + + if (write) + { + if (CanDispatchLayer(this->sourceExtended->GetWriteHandle())) + { + return true; + } + } + + return false; + } + else + { + // Whatever, I doubt implementing this is worth the perf hit + return CanDispatchLayer(-1); + } + } + + bool LoopQueue::SourceExtended::CanDispatchLayer(int fd) + { if (!this->bHasCommited) { return {}; @@ -897,6 +1023,23 @@ namespace Aurora::IO::Loop } } + return true; + } + + AuTuple LoopQueue::SourceExtended::DoWork(int fd) + { + if (!CanDispatchLayer(fd)) + { + return {}; + } + + return DoWorkLate(); + } + + AuTuple LoopQueue::SourceExtended::DoWorkLate() + { + bool bShouldRemove {true}; + AuUInt8 uPosition {}; bool bOverload {}; if ((this->subscribers.empty()) && diff --git a/Source/IO/Loop/LoopQueue.Linux.hpp b/Source/IO/Loop/LoopQueue.Linux.hpp index 95f0cccc..abc168b5 100644 --- a/Source/IO/Loop/LoopQueue.Linux.hpp +++ b/Source/IO/Loop/LoopQueue.Linux.hpp @@ -46,7 +46,11 @@ namespace Aurora::IO::Loop AuUInt32 WaitAny(AuUInt32 timeout) override; AuList> WaitAnyEx(AuUInt32 timeout) override; - AuUInt32 DoTick(AuUInt64, AuList> *optOut = nullptr, bool *tryAgain = nullptr, bool async = false); + private: + struct AnEpoll; + + public: + AuUInt32 DoTick(AuUInt64, AuList> *optOut = nullptr, bool *tryAgain = nullptr, bool async = false, bool all = false, AnEpoll *pAllEpoll = nullptr); private: @@ -97,15 +101,20 @@ namespace Aurora::IO::Loop // ticked, should remove AuTuple DoWork(int fd); AuTuple DoWork(bool read, bool write); + AuTuple DoWorkLate(); + bool CanDispatchLayer(int fd); + bool CanDispatchLayer(bool read, bool write); }; struct AnEpoll { LoopQueue *parent {}; + bool bAll {}; AuThreadPrimitives::SpinLock lock; AuBST startingWorkRead; AuBST startingWorkWrite; + AuList> done; void Add(SourceExtended *source); void Remove(SourceExtended *source, bool readData, bool writeData);