AuroraRuntime/Source/IO/Loop/LoopQueue.Linux.cpp
J Reece Wilson 02826d2365 [+] AuLoop::kWaitMultipleFlagNoIOCallbacks
[+] AuLoop::kWaitMultipleFlagBreakAfterAPC
[+] Alternative Wait AND implementations for NT, POSIX, and generic
[+] IOConfig::...
[*] LoopQueue improvements
[+] ILoopQueue::ConfigureDoIOApcCallbacks
[+] ILoopQueue::ConfigureDoIOApcCallbacks
2024-10-10 11:03:26 +01:00

1195 lines
33 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: LoopQueue.Linux.cpp
Date: 2022-4-5
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Loop.NT.hpp"
#include "ILoopSourceEx.hpp"
#include "LoopQueue.Linux.hpp"
#include <sys/epoll.h>
#include <Source/Time/Time.hpp>
#include <Source/IO/UNIX/IOSubmit.Linux.hpp>
#if defined(AURORA_COMPILER_CLANG)
// warning: ISO C++20 considers use of overloaded operator '!=' (with operand types 'AuSPtr<Aurora::IO::Loop::ILoopSource>' (aka 'ExSharedPtr<Aurora::IO::Loop::ILoopSource, std::shared_ptr<ILoopSource>>') and 'typename tuple_element<0UL, tuple<ExSharedPtr<ILoopSource, shared_ptr<ILoopSource>>, ExSharedPtr<ILoopSourceSubscriber, shared_ptr<ILoopSourceSubscriber>>, ExSharedPtr<ILoopSourceSubscriberEx, shared_ptr<ILoopSourceSubscriberEx>>>>::type' (aka '__type_pack_element<0UL, Aurora::Memory::ExSharedPtr<Aurora::IO::Loop::ILoopSource, std::shared_ptr<Aurora::IO::Loop::ILoopSource>>, Aurora::Memory::ExSharedPtr<Aurora::IO::Loop::ILoopSourceSubscriber, std::shared_ptr<Aurora::IO::Loop::ILoopSourceSubscriber>>, Aurora::Memory::ExSharedPtr<Aurora::IO::Loop::ILoopSourceSubscriberEx, std::shared_ptr<Aurora::IO::Loop::ILoopSourceSubscriberEx>>>')) to be ambiguous despite there being a unique best viable function with non-reversed arguments [-Wambiguous-reversed-operator]
#pragma clang diagnostic ignored "-Wambiguous-reversed-operator"
// Yea, I couldn't give less of a nanoshit what some C++20 spec says. Even llvm/clang doesn't care to language police it into a fatal unimplemented compiler condition. So, idc.
#endif
namespace Aurora::IO::Loop
{
void ResetLoopSourceFalseAlarm(const AuSPtr<Loop::ILoopSource> &pLoopSource);
// On Linux, Loop Queues are glorified eventfd to epoll adapters.
// Requeuing the cached fd array per frame on the TLS io_submit object
// would be more costly than maintaining an epoll for all fds tracked
// by the loop queue.
// We can delegate the wait functions to an NT overlapped like shim
// where all eventfds are one epoll handle
// The TLS worker would get awoken by any changes in the epoll queue
// or if the io submit object should preemptively abort
// The TLS worker would remain resposible for scheduling thread local
// network and file transactions independent from the loop queues
// As such, loop queues continue to be defined as a mechanism to merely
// wait, not dispatch/manage work
// Delegating mutex reads to a single io_submit would be a linux-specific
// kevent-non-reusable ThreadWorkerQueueShim hack
// ...it wouldn't make sense create another loop queue per thread concept
// outside of the async subsystem (not counting TLS overlapped io)
LoopQueue::LoopQueue() : lockStealer_(false, false, true)
{
}
LoopQueue::~LoopQueue()
{
Deinit();
}
bool LoopQueue::Init()
{
this->epollFd_ = epoll_create1(EPOLL_CLOEXEC);
if (this->epollFd_ == -1)
{
return false;
}
this->sourceMutex_ = AuThreadPrimitives::RWLockUnique();
if (!this->sourceMutex_)
{
return false;
}
this->polledItemsMutex_ = AuThreadPrimitives::RWLockUnique();
if (!this->polledItemsMutex_)
{
return false;
}
this->globalEpoll_.parent = this;
return AuTryInsert(this->alternativeEpolls_, &this->globalEpoll_);
}
void LoopQueue::Deinit()
{
int fd;
if ((fd = AuExchange(this->epollFd_, -1)) != -1)
{
::close(fd);
}
}
void LoopQueue::AnEpoll::Add(SourceExtended *source)
{
epoll_event event;
auto ex = source->sourceExtended;
if (!ex)
{
return;
}
event.data.ptr = source;
if (ex->Singular())
{
bool bDouble {};
int oldReadRef {};
int oldWriteRef {};
auto read = ex->GetHandle();
if (read != -1)
{
oldReadRef = startingWorkRead[read]++;
bDouble |= startingWorkWrite.find(read) != startingWorkWrite.end();
}
auto write = ex->GetWriteHandle();
if (write != -1)
{
oldWriteRef = startingWorkWrite[write]++;
bDouble |= startingWorkRead.find(write) != startingWorkRead.end();
}
if (bDouble)
{
epoll_event event;
event.events = EPOLLOUT | EPOLLIN;
event.data.ptr = source;
if ((oldReadRef == 0) && (oldWriteRef == 0))
{
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_ADD, write, &event);
}
else
{
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_MOD, write, &event);
}
}
if ((write != -1) && (!oldWriteRef))
{
event.events = EPOLLOUT;
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_ADD, write, &event);
}
if ((read != -1) && (!oldReadRef))
{
event.events = EPOLLIN;
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_ADD, read, &event);
}
}
else
{
auto read = ex->GetHandles();
auto write = ex->GetWriteHandles();
for (auto readHandle : read)
{
auto count = startingWorkRead[readHandle]++;
if (count)
{
continue;
}
if (AuExists(write, readHandle))
{
continue;
}
event.events = EPOLLIN;
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_ADD, readHandle, &event);
}
for (auto writeHandle : write)
{
auto count = startingWorkWrite[writeHandle]++;
if (count)
{
if (AuExists(read, writeHandle))
{
event.events = EPOLLOUT | EPOLLIN;
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_MOD, writeHandle, &event);
}
continue;
}
if (AuExists(read, writeHandle))
{
event.events = EPOLLOUT | EPOLLIN;
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_ADD, writeHandle, &event);
}
else
{
event.events = EPOLLOUT;
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_ADD, writeHandle, &event);
}
}
}
}
bool LoopQueue::SourceAdd(const AuSPtr<ILoopSource> &source)
{
return LoopQueue::SourceAddWithTimeout(source, 0);
}
bool LoopQueue::SourceAddWithTimeout(const AuSPtr<ILoopSource> &source, AuUInt32 ms)
{
return LoopQueue::SourceAddWithTimeoutEx(source, ms);
}
bool LoopQueue::SourceAddWithTimeoutEx(const AuSPtr<ILoopSource> &source, AuUInt32 ms)
{
auto pSrc = AuMakeShared<SourceExtended>(this, source);
if (!pSrc)
{
return false;
}
if (ms)
{
pSrc->timeoutAbs = (AuUInt64)ms + AuTime::SteadyClockMS();
}
else
{
pSrc->timeoutAbs = 0;
}
auto pWaitable = this->sourceMutex_->AsWritable();
auto pLocked = pWaitable->TryLock();
if (pLocked)
{
if (!AuTryInsert(this->pendingBlocking_, AuMakePair(pSrc, ms)))
{
pWaitable->Unlock();
return false;
}
else
{
pWaitable->Unlock();
return true;
}
}
else
{
AU_LOCK_GUARD(this->commitQueueMutex_);
this->lockStealer_.Set();
return AuTryInsert(this->pendingBlocking_, AuMakePair(pSrc, ms));
}
}
bool LoopQueue::SourceRemove(const AuSPtr<ILoopSource> &source)
{
AU_LOCK_GUARD(this->commitQueueMutex_);
return AuTryInsert(this->decommitQueue_, source);
}
AuUInt32 LoopQueue::GetSourceCount()
{
return this->sources_.size();
}
bool LoopQueue::AddCallback(const AuSPtr<ILoopSource> &source, const AuSPtr<ILoopSourceSubscriber> &subscriber)
{
AU_LOCK_GUARD(this->commitQueueMutex_);
return AuTryInsert(this->commitPending_, AuMakeTuple(source, subscriber, AuSPtr<ILoopSourceSubscriberEx>{}));
}
bool LoopQueue::AddCallbackEx(const AuSPtr<ILoopSource> &source, const AuSPtr<ILoopSourceSubscriberEx> &subscriber)
{
AU_LOCK_GUARD(this->commitQueueMutex_);
return AuTryInsert(this->commitPending_, AuMakeTuple(source, AuSPtr<ILoopSourceSubscriber>{}, subscriber));
}
bool LoopQueue::AddCallback(const AuSPtr<ILoopSourceSubscriber> &subscriber)
{
AU_LOCK_GUARD(this->globalLockMutex_);
return AuTryInsert(this->allSubscribers_, subscriber);
}
void LoopQueue::ChugPathConfigure(AuUInt32 sectionTickTime, AuSInt sectionDequeCount)
{
// Intentionally NO-OP under Linux
}
void LoopQueue::ChugHint(bool value)
{
// Intentionally NO-OP under Linux
}
void LoopQueue::ConfigureDoIOApcCallbacks(bool bDoIOAPCs)
{
this->bDoIOAPCs_ = bDoIOAPCs;
}
void LoopQueue::ConfigureBreakAnyAfterAPC(bool bAPCBreaks)
{
this->bAPCBreaks_ = bAPCBreaks;
}
bool LoopQueue::CommitDecommit()
{
AuUInt32 dwSuccess {};
if (this->decommitQueue_.empty())
{
return true;
}
AU_DEBUG_MEMCRUNCH;
auto decommitQueue = AuExchange(this->decommitQueue_, {});
for (auto sourceExtended : sources_)
{
bool bFound {};
for (auto decommit : decommitQueue)
{
if (decommit == sourceExtended->source)
{
bFound = true;
break;
}
}
if (!bFound)
{
continue;
}
AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
for (auto epoll : this->alternativeEpolls_)
{
epoll->Remove(sourceExtended.get(), true, true);
}
dwSuccess++;
}
// TODO (Reece): Urgent. Fails under an IO update dtor. Faking perfect unit tests until i make it. Need linux aurt.
//SysAssertDbg(dwSuccess == decommitQueue.size(), "caught SourceRemove on invalid");
return dwSuccess;
}
bool LoopQueue::Commit()
{
AU_DEBUG_MEMCRUNCH;
AU_LOCK_GUARD(this->commitQueueMutex_);
auto pWritable = this->sourceMutex_->AsWritable();
if (pWritable->TryLock())
{
doUnderLock:
for (const auto & [pSource, ms] : AuExchange(this->pendingBlocking_, {}))
{
if (!AuTryInsert(this->sources_, AuConstReference(pSource)))
{
SysPanic("OOM under memcrunch. this shouldn't be possible?");
}
this->globalEpoll_.Add(pSource.get());
}
if (!CommitDecommit())
{
// TODO: this was commented out years ago
// fix bug?
//pWritable->Unlock();
//return false;
}
auto pending = AuExchange(this->commitPending_, {});
for (auto &source : this->sources_)
{
for (auto itr = pending.begin(); itr != pending.end(); )
{
if (source->source != AuGet<0>(*itr))
{
itr ++;
continue;
}
auto a = AuGet<1>(*itr);
if (a)
{
if (!AuTryInsert(source->subscribers, a))
{
this->commitPending_ = AuMove(this->commitPending_);
pWritable->Unlock();
return false;
}
}
auto b = AuGet<2>(*itr);
if (b)
{
if (!AuTryInsert(source->subscriberExs, b))
{
// 1 and 2 are mutually exclusive, dont worry about clean up
this->commitPending_ = AuMove(this->commitPending_);
pWritable->Unlock();
return false;
}
}
itr = pending.erase(itr);
}
source->Commit(source);
}
pWritable->Unlock();
}
else
{
this->bRecommitLater = true;
this->lockStealer_.Set();
// try lock doesn't block future reads. this might.
if (pWritable->LockNS(250'000))
{
goto doUnderLock;
}
}
return true;
}
bool LoopQueue::IsSignaledPeek()
{
fd_set readSet;
struct timeval tv {};
FD_ZERO(&readSet);
FD_SET(this->epollFd_, &readSet);
auto active = select(this->epollFd_ + 1, &readSet, NULL, NULL, &tv);
if (active == -1)
{
// todo push error
return false;
}
return active == 1;
}
// This could be implemented more like a possible BSD implementation
// if we were to implement based on io_submit poll
bool LoopQueue::WaitAll(AuUInt32 timeoutIn)
{
AU_DEBUG_MEMCRUNCH;
AnEpoll epollReference;
{
AU_LOCK_GUARD(this->globalEpoll_.lock);
epollReference = this->globalEpoll_;
}
epollReference.lock = {};
epollReference.bAll = true;
AuUInt64 timeout {timeoutIn};
if (timeout)
{
timeout += AuTime::SteadyClockMS();
}
{
AU_LOCK_GUARD(this->polledItemsMutex_->AsWritable());
AuTryInsert(this->alternativeEpolls_, &epollReference);
}
bool anythingLeft {};
bool bTimeout {};
do
{
anythingLeft = epollReference.startingWorkRead.size() || epollReference.startingWorkWrite.size();
if (!anythingLeft) return true;
//WaitAny(0);
// [==========] 1 test from 1 test suite ran. (11100 ms total)
// ...and a turbojet
//bool bTryAgain {};
//DoTick(timeout, {}, &bTryAgain);
// ...and + ~10ms latency
//bool bTryAgain {};
//DoTick(AuMin(AuUInt64(AuTime::CurrentClockMS() + 4), timeout), {}, &bTryAgain);
// [----------] 1 test from Loop (11101 ms total)
// ...and no jet engine (+ lower latency than windows)
bool bTryAgain {};
DoTick(timeout, {}, &bTryAgain, false, true, &epollReference);
PumpHooks();
// but this hack should apply to wait any as well, so i'm moving it to the DoTick function
anythingLeft = epollReference.startingWorkRead.size() || epollReference.startingWorkWrite.size();
bTimeout = timeout ? AuTime::SteadyClockMS() >= timeout : false;
} while (anythingLeft && !bTimeout);
{
AU_LOCK_GUARD(this->polledItemsMutex_->AsWritable());
SysAssert(AuTryRemove(this->alternativeEpolls_, &epollReference));
}
{
AU_LOCK_GUARD(this->sourceMutex_->AsWritable());
for (auto & pSource : epollReference.done)
{
bool bReAdd { false };
if (anythingLeft)
{
bReAdd = true;
ResetLoopSourceFalseAlarm(pSource->source);
}
else
{
auto [ticked, remove, noworkers] = pSource->DoWorkLate();
if (!remove)
{
bReAdd = true;
}
}
if (!bReAdd)
{
continue;
}
{
AuTryInsert(this->sources_, AuConstReference(pSource));
}
{
AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
for (auto epoll : this->alternativeEpolls_)
{
epoll->Add(pSource.get());
}
}
}
}
return !anythingLeft;
}
AuUInt32 LoopQueue::WaitAny(AuUInt32 timeoutIn)
{
AuUInt64 timeout = timeoutIn;
if (timeout)
{
timeout += AuTime::SteadyClockMS();
}
AU_DEBUG_MEMCRUNCH;
AuUInt32 cTicked {};
bool bTryAgain {};
do
{
bTryAgain = false;
AuUInt32 ticked = DoTick(timeout, {}, &bTryAgain);
PumpHooks();
cTicked += ticked;
} while (bTryAgain);
return cTicked;
}
AuUInt32 LoopQueue::PumpNonblocking()
{
AuUInt32 cTicked {};
bool bTryAgain {};
do
{
bTryAgain = false;
AuUInt32 ticked = DoTick(0, {}, &bTryAgain, true);
PumpHooks();
cTicked += ticked;
} while (bTryAgain);
return cTicked;
}
AuList<AuSPtr<ILoopSource>> LoopQueue::PumpNonblockingEx()
{
AuList<AuSPtr<ILoopSource>> ret;
bool bTryAgain {};
do
{
bTryAgain = false;
AuUInt32 ticked = DoTick(0, &ret, &bTryAgain, true);
PumpHooks();
} while (bTryAgain);
return ret;
}
AuList<AuSPtr<ILoopSource>> LoopQueue::WaitAnyEx(AuUInt32 timeoutIn)
{
AuList<AuSPtr<ILoopSource>> ret;
AuUInt64 timeout = timeoutIn;
if (timeout)
{
timeout += AuTime::SteadyClockMS();
}
bool bTryAgain {};
do
{
bTryAgain = false;
AuUInt32 ticked = DoTick(timeout, &ret, &bTryAgain);
PumpHooks();
} while (bTryAgain);
return ret;
}
void LoopQueue::AnEpoll::Remove(SourceExtended *source, bool readData, bool writeData)
{
if (!source->sourceExtended)
{
return;
}
auto ex = source->sourceExtended;
AU_LOCK_GUARD(this->lock);
bool bIsRoot = this == &this->parent->globalEpoll_;
if (readData)
{
for (auto i = startingWorkRead.begin(); i != startingWorkRead.end(); )
{
bool doesntMatch {};
auto &fd = i->first;
auto &usage = i->second;
if (ex->Singular())
{
doesntMatch = fd != ex->GetHandle();
}
else
{
doesntMatch = !AuExists(ex->GetHandles(), fd);
}
if (doesntMatch)
{
i++;
continue;
}
if ((--(usage)) != 0)
{
i++;
continue;
}
if (bIsRoot)
{
if (startingWorkWrite.find(fd) == startingWorkWrite.end())
{
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_DEL, fd, nullptr);
}
else
{
epoll_event event;
event.events = EPOLLOUT;
event.data.ptr = source;
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_MOD, fd, &event);
}
}
i = startingWorkRead.erase(i);
}
}
if (writeData)
{
for (auto i = startingWorkWrite.begin(); i != startingWorkWrite.end(); )
{
bool doesntMatch {};
auto &fd = i->first;
auto &usage = i->second;
if (ex->Singular())
{
doesntMatch = fd != ex->GetWriteHandle();
}
else
{
doesntMatch = !AuExists(ex->GetWriteHandles(), fd);
}
if (doesntMatch )
{
i++;
continue;
}
if ((--(usage)) != 0)
{
i++;
continue;
}
if (bIsRoot)
{
if (startingWorkRead.find(fd) == startingWorkRead.end())
{
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_DEL, fd, nullptr);
}
else
{
epoll_event event;
event.events = EPOLLIN;
event.data.ptr = source;
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_MOD, fd, &event);
}
}
i = startingWorkWrite.erase(i);
}
}
}
AuUInt32 LoopQueue::DoTick(AuUInt64 time, AuList<AuSPtr<ILoopSource>> *optOut, bool *tryAgain, bool nonblock, bool all, AnEpoll *pAllEpoll)
{
AuUInt32 bTicked {};
AuUInt64 now {};
epoll_event events[128];
this->Commit();
AU_LOCK_GUARD(this->sourceMutex_->AsReadable());
for (const auto & source : this->sources_)
{
if (source->sourceExtended)
{
source->sourceExtended->OnPresleep();
}
}
AuInt64 deltaMS = 0;
if (time)
{
deltaMS = AuMin(AuInt64(150), (AuInt64)time - (AuInt64)AuTime::SteadyClockMS());
if (deltaMS < 0)
{
deltaMS = 0;
}
}
else
{
deltaMS = nonblock ? 0 : -1;
}
int iEvents;
if (this->bDoIOAPCs_)
{
iEvents = IO::UNIX::LinuxOverlappedEpollShim(this->epollFd_, events, AuArraySize(events), deltaMS);
}
else
{
iEvents = epoll_wait(this->epollFd_, events, AuArraySize(events), deltaMS);
}
if (iEvents == -1)
{
goto out;
}
for (int i = 0; i < iEvents; i++)
{
bool readData = events[i].events & EPOLLIN;
bool writeData = events[i].events & EPOLLOUT;
auto handle = events[i].data.ptr;
if (!handle)
{
continue;
}
auto base = AuReinterpretCast<SourceExtended *>(handle);
if (!base->bHasCommited)
{
continue;
}
auto source = base->pin.lock();
if (all)
{
bool bTriggered = source->CanDispatchLayer(readData, writeData);
if (!bTriggered)
{
continue;
}
{
this->sourceMutex_->UpgradeReadToWrite(0);
AuTryRemove(this->sources_, source);
this->sourceMutex_->DowngradeWriteToRead();
}
{
AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
for (auto epoll : this->alternativeEpolls_)
{
epoll->Remove(source.get(), readData, writeData);
}
}
{
AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
for (auto epoll : this->alternativeEpolls_)
{
if (epoll != &this->globalEpoll_)
{
epoll->Remove(source.get(), readData, writeData);
}
}
}
bTicked += 1;
pAllEpoll->done.push_back(source);
}
else
{
auto [ticked, remove, noworkers] = source->DoWork(readData, writeData);
bTicked += ticked;
if (ticked)
{
if (optOut)
{
optOut->push_back(source->source);
}
}
if (remove)
{
this->sourceMutex_->UpgradeReadToWrite(0);
AuTryRemove(this->sources_, source);
this->sourceMutex_->DowngradeWriteToRead();
}
if (remove)
{
AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
for (auto epoll : this->alternativeEpolls_)
{
epoll->Remove(source.get(), readData, writeData);
}
}
// Fire waitall
// not sure i like how this fires all anys and alls.
// this isnt consistent
if (noworkers)
{
AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
for (auto epoll : this->alternativeEpolls_)
{
if (epoll != &this->globalEpoll_)
{
epoll->Remove(source.get(), readData, writeData);
}
}
}
}
}
now = AuTime::SteadyClockMS();
if (!bTicked)
{
if (tryAgain)
{
*tryAgain = ((this->lockStealer_.IsSignaled()) ||
(now < time));
}
}
out:
if (!now)
{
now = AuTime::SteadyClockMS();
}
for (auto itr = this->sources_.begin(); itr != this->sources_.end(); )
{
AuSPtr<SourceExtended> source = *itr;
bool remove {};
if (!remove)
{
remove = source->ConsiderTimeout(now);
}
if (remove)
{
this->sourceMutex_->UpgradeReadToWrite(0);
itr = this->sources_.erase(itr);
this->sourceMutex_->DowngradeWriteToRead();
}
if (remove)
{
AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
for (auto epoll : this->alternativeEpolls_)
{
epoll->Remove(source.get(), true, true);
}
}
if (source->sourceExtended)
{
source->sourceExtended->OnFinishSleep();
}
if (!remove)
{
itr ++;
}
}
return bTicked;
}
LoopQueue::SourceExtended::SourceExtended(LoopQueue *parent, const AuSPtr<ILoopSource> &source) :
parent(parent),
source(source)
{
this->sourceExtended = AuDynamicCast<ILoopSourceEx>(source.get());
}
LoopQueue::SourceExtended::~SourceExtended()
{
Deinit();
}
void LoopQueue::SourceExtended::Deinit()
{
this->pin.reset();
}
void LoopQueue::SourceExtended::Commit(const AuSPtr<SourceExtended> &self)
{
this->pin = self;
this->bHasCommited = true;
}
AuTuple<bool, bool, bool> LoopQueue::SourceExtended::DoWork(bool read, bool write)
{
if (!this->sourceExtended)
{
return DoWork(-1);
}
if (this->sourceExtended->Singular())
{
AuPair<bool, bool> ret;
bool bSingleOnlyFlag {true};
if (read)
{
auto [a, b, c] = DoWork(this->sourceExtended->GetHandle());
ret.first |= a;
ret.second |= b;
bSingleOnlyFlag &= c;
}
if (write)
{
auto [a, b, c] = DoWork(this->sourceExtended->GetWriteHandle());
ret.first |= a;
ret.second |= b;
bSingleOnlyFlag &= c;
}
return AuMakeTuple(AuGet<0>(ret), AuGet<1>(ret), bSingleOnlyFlag);
}
else
{
// Whatever, I doubt implementing this is worth the perf hit
return DoWork(-1);
}
}
bool LoopQueue::SourceExtended::CanDispatchLayer(bool read, bool write)
{
if (!this->sourceExtended)
{
return false;
}
if (this->sourceExtended->Singular())
{
AuPair<bool, bool> ret;
bool bSingleOnlyFlag {true};
if (read)
{
if (CanDispatchLayer(this->sourceExtended->GetHandle()))
{
return true;
}
}
if (write)
{
if (CanDispatchLayer(this->sourceExtended->GetWriteHandle()))
{
return true;
}
}
return false;
}
else
{
// Whatever, I doubt implementing this is worth the perf hit
return CanDispatchLayer(-1);
}
}
bool LoopQueue::SourceExtended::CanDispatchLayer(int fd)
{
if (!this->bHasCommited)
{
return {};
}
if (this->sourceExtended)
{
if (!this->sourceExtended->OnTrigger(fd))
{
return {};
}
}
return true;
}
AuTuple<bool, bool, bool> LoopQueue::SourceExtended::DoWork(int fd)
{
if (!CanDispatchLayer(fd))
{
return {};
}
return DoWorkLate();
}
AuTuple<bool, bool, bool> LoopQueue::SourceExtended::DoWorkLate()
{
bool bShouldRemove {true};
AuUInt8 uPosition {};
bool bOverload {};
if ((this->subscribers.empty()) &&
(this->subscriberExs.empty()))
{
bOverload = true;
}
// Notify callbacks...
for (auto itr = this->subscriberExs.begin();
itr != this->subscriberExs.end(); )
{
bool result;
auto handler = *itr;
try
{
result = handler->OnFinished(this->source, uPosition++);
}
catch (...)
{
SysPushErrorCatch();
}
bShouldRemove &= result;
if (result)
{
itr = this->subscriberExs.erase(itr);
}
else
{
itr++;
}
}
for (auto itr = this->subscribers.begin();
itr != this->subscribers.end(); )
{
bool result;
auto handler = *itr;
try
{
result = handler->OnFinished(this->source);
}
catch (...)
{
SysPushErrorCatch();
}
bShouldRemove &= result;
if (result)
{
itr = this->subscribers.erase(itr);
}
else
{
itr++;
}
}
// Evict when subs count hit zero, not when sub count started off at zero
if (bOverload)
{
bShouldRemove = false;
}
// Notify global subscribers, allowing them to preempt removal
if (bShouldRemove || bOverload)
{
AU_LOCK_GUARD(this->parent->globalLockMutex_);
for (const auto &handler : this->parent->allSubscribers_)
{
try
{
bShouldRemove &= handler->OnFinished(this->source);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
return AuMakeTuple(true, bShouldRemove, bOverload);
}
bool LoopQueue::AddHook(const AuFunction<void()> &func)
{
return AuTryInsert(this->epilogueHooks_, func);
}
void LoopQueue::PumpHooks()
{
if (AuExchange(this->bRecommitLater, false))
{
this->Commit();
}
auto c = AuExchange(this->epilogueHooks_, {});
for (auto &a : c)
{
a();
}
}
AUKN_SYM AuSPtr<ILoopQueue> NewLoopQueue()
{
auto queue = AuMakeShared<LoopQueue>();
if (!queue)
{
return {};
}
if (!queue->Init())
{
return {};
}
return queue;
}
}