/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: LoopQueue.Linux.cpp Date: 2022-4-5 Author: Reece ***/ #include #include "Loop.NT.hpp" #include "ILoopSourceEx.hpp" #include "LoopQueue.Linux.hpp" #include #include #include namespace Aurora::IO::Loop { // On Linux, Loop Queues are glorified eventfd to epoll adapters. // Requeuing the cached fd array per frame on the TLS io_submit object // would be more costly than maintaining an epoll for all fds tracked // by the loop queue. // We can delegate the wait functions to an NT overlapped like shim // where all eventfds are one epoll handle // The TLS worker would get awoken by any changes in the epoll queue // or if the io submit object should preemptively abort // The TLS worker would remain resposible for scheduling thread local // network and file transactions independent from the loop queues // As such, loop queues continue to be defined as a mechanism to merely // wait, not dispatch/manage work // Delegating mutex reads to a single io_submit would be a linux-specific // kevent-non-reusable ThreadWorkerQueueShim hack // ...it wouldn't make sense create another loop queue per thread concept // outside of the async subsystem (not counting TLS overlapped io) LoopQueue::LoopQueue() : lockStealer_(false, false, true) { } LoopQueue::~LoopQueue() { Deinit(); } bool LoopQueue::Init() { this->epollFd_ = epoll_create1(EPOLL_CLOEXEC); if (this->epollFd_ == -1) { return false; } this->sourceMutex_ = AuThreadPrimitives::RWLockUnique(); if (!this->sourceMutex_) { return false; } this->polledItemsMutex_ = AuThreadPrimitives::RWLockUnique(); if (!this->polledItemsMutex_) { return false; } this->globalEpoll_.parent = this; return AuTryInsert(this->alternativeEpolls_, &this->globalEpoll_); } void LoopQueue::Deinit() { int fd; if ((fd = AuExchange(this->epollFd_, -1)) != -1) { ::close(fd); } } void LoopQueue::AnEpoll::Add(SourceExtended *source) { epoll_event event; auto ex = source->sourceExtended; if (!ex) { return; } event.data.ptr = source; if (ex->Singular()) { bool bDouble {}; int oldReadRef {}; int oldWriteRef {}; auto read = ex->GetHandle(); if (read != -1) { oldReadRef = startingWorkRead[read]++; bDouble |= startingWorkWrite.find(read) != startingWorkWrite.end(); } auto write = ex->GetWriteHandle(); if (write != -1) { oldWriteRef = startingWorkWrite[write]++; bDouble |= startingWorkRead.find(write) != startingWorkRead.end(); } if (bDouble) { epoll_event event; event.events = EPOLLOUT | EPOLLIN; event.data.ptr = source; if ((oldReadRef == 0) && (oldWriteRef == 0)) { epoll_ctl(this->parent->epollFd_, EPOLL_CTL_ADD, write, &event); } else { epoll_ctl(this->parent->epollFd_, EPOLL_CTL_MOD, write, &event); } } if ((write != -1) && (!oldWriteRef)) { event.events = EPOLLOUT; epoll_ctl(this->parent->epollFd_, EPOLL_CTL_ADD, write, &event); } if ((read != -1) && (!oldReadRef)) { event.events = EPOLLIN; epoll_ctl(this->parent->epollFd_, EPOLL_CTL_ADD, read, &event); } } else { auto read = ex->GetHandles(); auto write = ex->GetWriteHandles(); for (auto readHandle : read) { auto count = startingWorkRead[readHandle]++; if (count) { continue; } if (AuExists(write, readHandle)) { continue; } event.events = EPOLLIN; epoll_ctl(this->parent->epollFd_, EPOLL_CTL_ADD, readHandle, &event); } for (auto writeHandle : write) { auto count = startingWorkWrite[writeHandle]++; if (count) { if (AuExists(read, writeHandle)) { event.events = EPOLLOUT | EPOLLIN; epoll_ctl(this->parent->epollFd_, EPOLL_CTL_MOD, writeHandle, &event); } continue; } if (AuExists(read, writeHandle)) { event.events = EPOLLOUT | EPOLLIN; epoll_ctl(this->parent->epollFd_, EPOLL_CTL_ADD, writeHandle, &event); } else { event.events = EPOLLOUT; epoll_ctl(this->parent->epollFd_, EPOLL_CTL_ADD, writeHandle, &event); } } } } bool LoopQueue::SourceAdd(const AuSPtr &source) { return SourceAddWithTimeout(source, 0); } bool LoopQueue::SourceAddWithTimeout(const AuSPtr &source, AuUInt32 ms) { AU_LOCK_GUARD(this->commitQueueMutex_); return this->SourceAddWithTimeoutEx(source, ms); } bool LoopQueue::SourceAddWithTimeoutEx(const AuSPtr &source, AuUInt32 ms) { this->lockStealer_.Set(); #if 0 AU_LOCK_GUARD(this->sourceMutex_->AsWritable()); #else auto pWaitable = this->sourceMutex_->AsWritable(); auto pLocked = pWaitable->TryLock(); if (pLocked) #endif { this->lockStealer_.Reset(); auto src = AuMakeShared(this, source); if (!src) { pWaitable->Unlock(); return false; } if (ms) { src->timeoutAbs = (AuUInt64)ms + AuTime::SteadyClockMS(); } if (!AuTryInsert(this->sources_, src)) { pWaitable->Unlock(); return false; } this->globalEpoll_.Add(src.get()); pWaitable->Unlock(); return true; } #if 1 else { return AuTryInsert(this->pendingBlocking_, AuMakePair(source, ms)); } #endif } bool LoopQueue::SourceRemove(const AuSPtr &source) { AU_LOCK_GUARD(this->commitQueueMutex_); return AuTryInsert(this->decommitQueue_, source); } AuUInt32 LoopQueue::GetSourceCount() { return this->sources_.size(); } bool LoopQueue::AddCallback(const AuSPtr &source, const AuSPtr &subscriber) { AU_LOCK_GUARD(this->commitQueueMutex_); return AuTryInsert(this->commitPending_, AuMakeTuple(source, subscriber, AuSPtr{})); } bool LoopQueue::AddCallbackEx(const AuSPtr &source, const AuSPtr &subscriber) { AU_LOCK_GUARD(this->commitQueueMutex_); return AuTryInsert(this->commitPending_, AuMakeTuple(source, AuSPtr{}, subscriber)); } bool LoopQueue::AddCallback(const AuSPtr &subscriber) { AU_LOCK_GUARD(this->globalLockMutex_); return AuTryInsert(this->allSubscribers_, subscriber); } void LoopQueue::ChugPathConfigure(AuUInt32 sectionTickTime, AuSInt sectionDequeCount) { // Intentionally NO-OP under Linux } void LoopQueue::ChugHint(bool value) { // Intentionally NO-OP under Linux } bool LoopQueue::CommitDecommit() { AuUInt32 dwSuccess {}; if (this->decommitQueue_.empty()) { return true; } auto decommitQueue = AuExchange(this->decommitQueue_, {}); for (auto sourceExtended : sources_) { bool bFound {}; for (auto decommit : decommitQueue) { if (decommit == sourceExtended->source) { bFound = true; break; } } if (!bFound) { continue; } AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable()); for (auto epoll : this->alternativeEpolls_) { epoll->Remove(sourceExtended.get(), true, true); } dwSuccess++; } // TODO (Reece): Urgent. Fails under an IO update dtor. Faking perfect unit tests until i make it. Need linux aurt. //SysAssertDbg(dwSuccess == decommitQueue.size(), "caught SourceRemove on invalid"); return dwSuccess; } bool LoopQueue::Commit() { AU_LOCK_GUARD(this->commitQueueMutex_); for (const auto & [pSource, ms] : AuExchange(this->pendingBlocking_, {})) { this->SourceAddWithTimeoutEx(pSource, ms); } this->lockStealer_.Set(); auto pWritable = this->sourceMutex_->AsWritable(); if (pWritable->TryLock()) { this->lockStealer_.Reset(); if (!CommitDecommit()) { //pWritable->Unlock(); //return false; } auto pending = AuExchange(this->commitPending_, {}); for (auto &source : this->sources_) { for (auto itr = pending.begin(); itr != pending.end(); ) { if (source->source != AuGet<0>(*itr)) { itr ++; continue; } auto a = AuGet<1>(*itr); if (a) { if (!AuTryInsert(source->subscribers, a)) { this->commitPending_ = AuMove(this->commitPending_); pWritable->Unlock(); return false; } } auto b = AuGet<2>(*itr); if (b) { if (!AuTryInsert(source->subscriberExs, b)) { // 1 and 2 are mutually exclusive, dont worry about clean up this->commitPending_ = AuMove(this->commitPending_); pWritable->Unlock(); return false; } } itr = pending.erase(itr); } source->Commit(source); } pWritable->Unlock(); } else { this->bRecommitLater = true; } return true; } bool LoopQueue::IsSignaledPeek() { fd_set readSet; struct timeval tv {}; FD_ZERO(&readSet); FD_SET(this->epollFd_, &readSet); auto active = select(this->epollFd_ + 1, &readSet, NULL, NULL, &tv); if (active == -1) { // todo push error return false; } return active == 1; } // This could be implemented more like a possible BSD implementation // if we were to implement based on io_submit poll bool LoopQueue::WaitAll(AuUInt32 timeoutIn) { AnEpoll epollReference; { AU_LOCK_GUARD(this->globalEpoll_.lock); epollReference = this->globalEpoll_; } epollReference.lock = {}; AuUInt64 timeout {timeoutIn}; if (timeout) { timeout += AuTime::SteadyClockMS(); } { AU_LOCK_GUARD(this->polledItemsMutex_->AsWritable()); AuTryInsert(this->alternativeEpolls_, &epollReference); } bool anythingLeft {}; bool bTimeout {}; do { anythingLeft = epollReference.startingWorkRead.size() || epollReference.startingWorkWrite.size(); if (!anythingLeft) return true; //WaitAny(0); // [==========] 1 test from 1 test suite ran. (11100 ms total) // ...and a turbojet //bool bTryAgain {}; //DoTick(timeout, {}, &bTryAgain); // ...and + ~10ms latency //bool bTryAgain {}; //DoTick(AuMin(AuUInt64(AuTime::CurrentClockMS() + 4), timeout), {}, &bTryAgain); // [----------] 1 test from Loop (11101 ms total) // ...and no jet engine (+ lower latency than windows) bool bTryAgain {}; DoTick(timeout, {}, &bTryAgain); PumpHooks(); // but this hack should apply to wait any as well, so i'm moving it to the DoTick function anythingLeft = epollReference.startingWorkRead.size() || epollReference.startingWorkWrite.size(); bTimeout = timeout ? AuTime::SteadyClockMS() >= timeout : false; } while (anythingLeft && !bTimeout); { AU_LOCK_GUARD(this->polledItemsMutex_->AsWritable()); SysAssert(AuTryRemove(this->alternativeEpolls_, &epollReference)); } return !anythingLeft; } AuUInt32 LoopQueue::WaitAny(AuUInt32 timeoutIn) { AuUInt64 timeout = timeoutIn; if (timeout) { timeout += AuTime::SteadyClockMS(); } AuUInt32 cTicked {}; bool bTryAgain {}; do { bTryAgain = false; AuUInt32 ticked = DoTick(timeout, {}, &bTryAgain); PumpHooks(); cTicked += ticked; } while (bTryAgain); return cTicked; } AuUInt32 LoopQueue::PumpNonblocking() { AuUInt32 cTicked {}; bool bTryAgain {}; do { bTryAgain = false; AuUInt32 ticked = DoTick(0, {}, &bTryAgain, true); PumpHooks(); cTicked += ticked; } while (bTryAgain); return cTicked; } AuList> LoopQueue::PumpNonblockingEx() { AuList> ret; bool bTryAgain {}; do { bTryAgain = false; AuUInt32 ticked = DoTick(0, &ret, &bTryAgain, true); PumpHooks(); } while (bTryAgain); return ret; } AuList> LoopQueue::WaitAnyEx(AuUInt32 timeoutIn) { AuList> ret; AuUInt64 timeout = timeoutIn; if (timeout) { timeout += AuTime::SteadyClockMS(); } bool bTryAgain {}; do { bTryAgain = false; AuUInt32 ticked = DoTick(timeout, &ret, &bTryAgain); PumpHooks(); } while (bTryAgain); return ret; } void LoopQueue::AnEpoll::Remove(SourceExtended *source, bool readData, bool writeData) { if (!source->sourceExtended) { return; } auto ex = source->sourceExtended; AU_LOCK_GUARD(this->lock); bool bIsRoot = this == &this->parent->globalEpoll_; if (readData) { for (auto i = startingWorkRead.begin(); i != startingWorkRead.end(); ) { bool doesntMatch {}; auto &fd = i->first; auto &usage = i->second; if (ex->Singular()) { doesntMatch = fd != ex->GetHandle(); } else { doesntMatch = !AuExists(ex->GetHandles(), fd); } if (doesntMatch) { i++; continue; } if ((--(usage)) != 0) { i++; continue; } if (bIsRoot) { if (startingWorkWrite.find(fd) == startingWorkWrite.end()) { epoll_ctl(this->parent->epollFd_, EPOLL_CTL_DEL, fd, nullptr); } else { epoll_event event; event.events = EPOLLOUT; event.data.ptr = source; epoll_ctl(this->parent->epollFd_, EPOLL_CTL_MOD, fd, &event); } } i = startingWorkRead.erase(i); } } if (writeData) { for (auto i = startingWorkWrite.begin(); i != startingWorkWrite.end(); ) { bool doesntMatch {}; auto &fd = i->first; auto &usage = i->second; if (ex->Singular()) { doesntMatch = fd != ex->GetWriteHandle(); } else { doesntMatch = !AuExists(ex->GetWriteHandles(), fd); } if (doesntMatch ) { i++; continue; } if ((--(usage)) != 0) { i++; continue; } if (bIsRoot) { if (startingWorkRead.find(fd) == startingWorkRead.end()) { epoll_ctl(this->parent->epollFd_, EPOLL_CTL_DEL, fd, nullptr); } else { epoll_event event; event.events = EPOLLIN; event.data.ptr = source; epoll_ctl(this->parent->epollFd_, EPOLL_CTL_MOD, fd, &event); } } i = startingWorkWrite.erase(i); } } } AuUInt32 LoopQueue::DoTick(AuUInt64 time, AuList> *optOut, bool *tryAgain, bool nonblock) { AuUInt32 bTicked {}; AuUInt64 now {}; epoll_event events[128]; AU_LOCK_GUARD(this->sourceMutex_->AsReadable()); for (const auto & source : this->sources_) { if (source->sourceExtended) { source->sourceExtended->OnPresleep(); } } AuInt64 deltaMS = 0; if (time) { deltaMS = AuMin(AuInt64(4), (AuInt64)time - (AuInt64)AuTime::SteadyClockMS()); if (deltaMS < 0) { deltaMS = 0; } } else { deltaMS = nonblock ? 0 : -1; } int iEvents = IO::UNIX::LinuxOverlappedEpollShim(this->epollFd_, events, AuArraySize(events), deltaMS); if (iEvents == -1) { goto out; } for (int i = 0; i < iEvents; i++) { bool readData = events[i].events & EPOLLIN; bool writeData = events[i].events & EPOLLOUT; auto handle = events[i].data.ptr; if (!handle) { continue; } auto base = AuReinterpretCast(handle); if (!base->bHasCommited) { continue; } auto source = base->pin.lock(); auto [ticked, remove, noworkers] = source->DoWork(readData, writeData); bTicked += ticked; if (ticked) { if (optOut) { optOut->push_back(source->source); } } if (remove) { this->sourceMutex_->UpgradeReadToWrite(0); AuTryRemove(this->sources_, source); this->sourceMutex_->DowngradeWriteToRead(); } if (remove) { AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable()); for (auto epoll : this->alternativeEpolls_) { epoll->Remove(source.get(), readData, writeData); } } // Fire waitall // not sure i like how this fires all anys and alls. // this isnt consistent if (noworkers) { AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable()); for (auto epoll : this->alternativeEpolls_) { if (epoll != &this->globalEpoll_) { epoll->Remove(source.get(), readData, writeData); } } } } now = AuTime::SteadyClockMS(); if (!bTicked) { if (tryAgain) { *tryAgain = ((this->lockStealer_.IsSignaled()) || (now < time)); } } out: if (!now) { now = AuTime::SteadyClockMS(); } for (auto itr = this->sources_.begin(); itr != this->sources_.end(); ) { AuSPtr source = *itr; bool remove {}; if (!remove) { remove = source->ConsiderTimeout(now); } if (remove) { this->sourceMutex_->UpgradeReadToWrite(0); itr = this->sources_.erase(itr); this->sourceMutex_->DowngradeWriteToRead(); } if (remove) { AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable()); for (auto epoll : this->alternativeEpolls_) { epoll->Remove(source.get(), true, true); } } if (source->sourceExtended) { source->sourceExtended->OnFinishSleep(); } if (!remove) { itr ++; } } return bTicked; } LoopQueue::SourceExtended::SourceExtended(LoopQueue *parent, const AuSPtr &source) : parent(parent), source(source) { this->sourceExtended = AuDynamicCast(source.get()); } LoopQueue::SourceExtended::~SourceExtended() { Deinit(); } void LoopQueue::SourceExtended::Deinit() { this->pin.reset(); } void LoopQueue::SourceExtended::Commit(const AuSPtr &self) { this->pin = self; this->bHasCommited = true; } AuTuple LoopQueue::SourceExtended::DoWork(bool read, bool write) { if (!this->sourceExtended) { return DoWork(-1); } if (this->sourceExtended->Singular()) { AuPair ret; bool bSingleOnlyFlag {true}; if (read) { auto [a, b, c] = DoWork(this->sourceExtended->GetHandle()); ret.first |= a; ret.second |= b; bSingleOnlyFlag &= c; } if (write) { auto [a, b, c] = DoWork(this->sourceExtended->GetWriteHandle()); ret.first |= a; ret.second |= b; bSingleOnlyFlag &= c; } return AuMakeTuple(AuGet<0>(ret), AuGet<1>(ret), bSingleOnlyFlag); } else { // Whatever, I doubt implementing this is worth the perf hit return DoWork(-1); } } AuTuple LoopQueue::SourceExtended::DoWork(int fd) { bool bShouldRemove {true}; AuUInt8 uPosition {}; if (!this->bHasCommited) { return {}; } if (this->sourceExtended) { if (!this->sourceExtended->OnTrigger(fd)) { return {}; } } bool bOverload {}; if ((this->subscribers.empty()) && (this->subscriberExs.empty())) { bOverload = true; } // Notify callbacks... for (auto itr = this->subscriberExs.begin(); itr != this->subscriberExs.end(); ) { bool result; auto handler = *itr; try { result = handler->OnFinished(this->source, uPosition++); } catch (...) { SysPushErrorCatch(); } bShouldRemove &= result; if (result) { itr = this->subscriberExs.erase(itr); } else { itr++; } } for (auto itr = this->subscribers.begin(); itr != this->subscribers.end(); ) { bool result; auto handler = *itr; try { result = handler->OnFinished(this->source); } catch (...) { SysPushErrorCatch(); } bShouldRemove &= result; if (result) { itr = this->subscribers.erase(itr); } else { itr++; } } // Evict when subs count hit zero, not when sub count started off at zero if (bOverload) { bShouldRemove = false; } // Notify global subscribers, allowing them to preempt removal if (bShouldRemove || bOverload) { AU_LOCK_GUARD(this->parent->globalLockMutex_); for (const auto &handler : this->parent->allSubscribers_) { try { bShouldRemove &= handler->OnFinished(this->source); } catch (...) { SysPushErrorCatch(); } } } return AuMakeTuple(true, bShouldRemove, bOverload); } bool LoopQueue::AddHook(const AuFunction &func) { return AuTryInsert(this->epilogueHooks_, func); } void LoopQueue::PumpHooks() { if (AuExchange(this->bRecommitLater, false)) { this->Commit(); } auto c = AuExchange(this->epilogueHooks_, {}); for (auto &a : c) { a(); } } AUKN_SYM AuSPtr NewLoopQueue() { auto queue = AuMakeShared(); if (!queue) { return {}; } if (!queue->Init()) { return {}; } return queue; } }