[*] Improve ILoopQueue::WaitAll parity (been on the todo list for, idk, 3 years)

This commit is contained in:
Reece Wilson 2024-08-01 12:55:08 +01:00
parent c6ce29a188
commit cc01b0557a
2 changed files with 188 additions and 36 deletions

View File

@ -21,6 +21,8 @@
namespace Aurora::IO::Loop namespace Aurora::IO::Loop
{ {
void ResetLoopSourceFalseAlarm(const AuSPtr<Loop::ILoopSource> &pLoopSource);
// On Linux, Loop Queues are glorified eventfd to epoll adapters. // On Linux, Loop Queues are glorified eventfd to epoll adapters.
// Requeuing the cached fd array per frame on the TLS io_submit object // Requeuing the cached fd array per frame on the TLS io_submit object
// would be more costly than maintaining an epoll for all fds tracked // would be more costly than maintaining an epoll for all fds tracked
@ -295,6 +297,8 @@ namespace Aurora::IO::Loop
return true; return true;
} }
AU_DEBUG_MEMCRUNCH;
auto decommitQueue = AuExchange(this->decommitQueue_, {}); auto decommitQueue = AuExchange(this->decommitQueue_, {});
for (auto sourceExtended : sources_) for (auto sourceExtended : sources_)
@ -333,6 +337,8 @@ namespace Aurora::IO::Loop
{ {
AU_LOCK_GUARD(this->commitQueueMutex_); AU_LOCK_GUARD(this->commitQueueMutex_);
AU_DEBUG_MEMCRUNCH;
for (const auto & [pSource, ms] : AuExchange(this->pendingBlocking_, {})) for (const auto & [pSource, ms] : AuExchange(this->pendingBlocking_, {}))
{ {
this->SourceAddWithTimeoutEx(pSource, ms); this->SourceAddWithTimeoutEx(pSource, ms);
@ -368,7 +374,7 @@ namespace Aurora::IO::Loop
if (!AuTryInsert(source->subscribers, a)) if (!AuTryInsert(source->subscribers, a))
{ {
this->commitPending_ = AuMove(this->commitPending_); this->commitPending_ = AuMove(this->commitPending_);
pWritable->Unlock(); pWritable->Unlock();
return false; return false;
} }
} }
@ -380,7 +386,7 @@ namespace Aurora::IO::Loop
{ {
// 1 and 2 are mutually exclusive, dont worry about clean up // 1 and 2 are mutually exclusive, dont worry about clean up
this->commitPending_ = AuMove(this->commitPending_); this->commitPending_ = AuMove(this->commitPending_);
pWritable->Unlock(); pWritable->Unlock();
return false; return false;
} }
} }
@ -422,12 +428,15 @@ namespace Aurora::IO::Loop
// if we were to implement based on io_submit poll // if we were to implement based on io_submit poll
bool LoopQueue::WaitAll(AuUInt32 timeoutIn) bool LoopQueue::WaitAll(AuUInt32 timeoutIn)
{ {
AU_DEBUG_MEMCRUNCH;
AnEpoll epollReference; AnEpoll epollReference;
{ {
AU_LOCK_GUARD(this->globalEpoll_.lock); AU_LOCK_GUARD(this->globalEpoll_.lock);
epollReference = this->globalEpoll_; epollReference = this->globalEpoll_;
} }
epollReference.lock = {}; epollReference.lock = {};
epollReference.bAll = true;
AuUInt64 timeout {timeoutIn}; AuUInt64 timeout {timeoutIn};
if (timeout) if (timeout)
@ -462,7 +471,7 @@ namespace Aurora::IO::Loop
bool bTryAgain {}; bool bTryAgain {};
DoTick(timeout, {}, &bTryAgain); DoTick(timeout, {}, &bTryAgain, false, true, &epollReference);
PumpHooks(); PumpHooks();
// but this hack should apply to wait any as well, so i'm moving it to the DoTick function // but this hack should apply to wait any as well, so i'm moving it to the DoTick function
@ -475,6 +484,47 @@ namespace Aurora::IO::Loop
SysAssert(AuTryRemove(this->alternativeEpolls_, &epollReference)); SysAssert(AuTryRemove(this->alternativeEpolls_, &epollReference));
} }
{
AU_LOCK_GUARD(this->sourceMutex_->AsWritable());
for (auto & pSource : epollReference.done)
{
bool bReAdd { false };
if (anythingLeft)
{
bReAdd = true;
ResetLoopSourceFalseAlarm(pSource->source);
}
else
{
auto [ticked, remove, noworkers] = pSource->DoWorkLate();
if (!remove)
{
bReAdd = true;
}
}
if (!bReAdd)
{
continue;
}
{
AuTryInsert(this->sources_, AuConstReference(pSource));
}
{
AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
for (auto epoll : this->alternativeEpolls_)
{
epoll->Add(pSource.get());
}
}
}
}
return !anythingLeft; return !anythingLeft;
} }
@ -487,6 +537,8 @@ namespace Aurora::IO::Loop
timeout += AuTime::SteadyClockMS(); timeout += AuTime::SteadyClockMS();
} }
AU_DEBUG_MEMCRUNCH;
AuUInt32 cTicked {}; AuUInt32 cTicked {};
bool bTryAgain {}; bool bTryAgain {};
do do
@ -661,7 +713,7 @@ namespace Aurora::IO::Loop
} }
} }
AuUInt32 LoopQueue::DoTick(AuUInt64 time, AuList<AuSPtr<ILoopSource>> *optOut, bool *tryAgain, bool nonblock) AuUInt32 LoopQueue::DoTick(AuUInt64 time, AuList<AuSPtr<ILoopSource>> *optOut, bool *tryAgain, bool nonblock, bool all, AnEpoll *pAllEpoll)
{ {
AuUInt32 bTicked {}; AuUInt32 bTicked {};
AuUInt64 now {}; AuUInt64 now {};
@ -716,46 +768,85 @@ namespace Aurora::IO::Loop
auto source = base->pin.lock(); auto source = base->pin.lock();
auto [ticked, remove, noworkers] = source->DoWork(readData, writeData); if (all)
bTicked += ticked;
if (ticked)
{ {
if (optOut) bool bTriggered = source->CanDispatchLayer(readData, writeData);
if (!bTriggered)
{ {
optOut->push_back(source->source); continue;
} }
}
if (remove)
{
this->sourceMutex_->UpgradeReadToWrite(0);
AuTryRemove(this->sources_, source);
this->sourceMutex_->DowngradeWriteToRead();
}
if (remove)
{
AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
for (auto epoll : this->alternativeEpolls_)
{ {
epoll->Remove(source.get(), readData, writeData); this->sourceMutex_->UpgradeReadToWrite(0);
AuTryRemove(this->sources_, source);
this->sourceMutex_->DowngradeWriteToRead();
} }
}
// Fire waitall
// not sure i like how this fires all anys and alls.
// this isnt consistent
if (noworkers)
{
AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
for (auto epoll : this->alternativeEpolls_)
{ {
if (epoll != &this->globalEpoll_) AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
for (auto epoll : this->alternativeEpolls_)
{ {
epoll->Remove(source.get(), readData, writeData); epoll->Remove(source.get(), readData, writeData);
} }
} }
{
AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
for (auto epoll : this->alternativeEpolls_)
{
if (epoll != &this->globalEpoll_)
{
epoll->Remove(source.get(), readData, writeData);
}
}
}
bTicked += 1;
pAllEpoll->done.push_back(source);
}
else
{
auto [ticked, remove, noworkers] = source->DoWork(readData, writeData);
bTicked += ticked;
if (ticked)
{
if (optOut)
{
optOut->push_back(source->source);
}
}
if (remove)
{
this->sourceMutex_->UpgradeReadToWrite(0);
AuTryRemove(this->sources_, source);
this->sourceMutex_->DowngradeWriteToRead();
}
if (remove)
{
AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
for (auto epoll : this->alternativeEpolls_)
{
epoll->Remove(source.get(), readData, writeData);
}
}
// Fire waitall
// not sure i like how this fires all anys and alls.
// this isnt consistent
if (noworkers)
{
AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
for (auto epoll : this->alternativeEpolls_)
{
if (epoll != &this->globalEpoll_)
{
epoll->Remove(source.get(), readData, writeData);
}
}
}
} }
} }
@ -879,11 +970,46 @@ namespace Aurora::IO::Loop
} }
} }
AuTuple<bool, bool, bool> LoopQueue::SourceExtended::DoWork(int fd) bool LoopQueue::SourceExtended::CanDispatchLayer(bool read, bool write)
{ {
bool bShouldRemove {true}; if (!this->sourceExtended)
AuUInt8 uPosition {}; {
return false;
}
if (this->sourceExtended->Singular())
{
AuPair<bool, bool> ret;
bool bSingleOnlyFlag {true};
if (read)
{
if (CanDispatchLayer(this->sourceExtended->GetHandle()))
{
return true;
}
}
if (write)
{
if (CanDispatchLayer(this->sourceExtended->GetWriteHandle()))
{
return true;
}
}
return false;
}
else
{
// Whatever, I doubt implementing this is worth the perf hit
return CanDispatchLayer(-1);
}
}
bool LoopQueue::SourceExtended::CanDispatchLayer(int fd)
{
if (!this->bHasCommited) if (!this->bHasCommited)
{ {
return {}; return {};
@ -897,6 +1023,23 @@ namespace Aurora::IO::Loop
} }
} }
return true;
}
AuTuple<bool, bool, bool> LoopQueue::SourceExtended::DoWork(int fd)
{
if (!CanDispatchLayer(fd))
{
return {};
}
return DoWorkLate();
}
AuTuple<bool, bool, bool> LoopQueue::SourceExtended::DoWorkLate()
{
bool bShouldRemove {true};
AuUInt8 uPosition {};
bool bOverload {}; bool bOverload {};
if ((this->subscribers.empty()) && if ((this->subscribers.empty()) &&

View File

@ -46,7 +46,11 @@ namespace Aurora::IO::Loop
AuUInt32 WaitAny(AuUInt32 timeout) override; AuUInt32 WaitAny(AuUInt32 timeout) override;
AuList<AuSPtr<ILoopSource>> WaitAnyEx(AuUInt32 timeout) override; AuList<AuSPtr<ILoopSource>> WaitAnyEx(AuUInt32 timeout) override;
AuUInt32 DoTick(AuUInt64, AuList<AuSPtr<ILoopSource>> *optOut = nullptr, bool *tryAgain = nullptr, bool async = false); private:
struct AnEpoll;
public:
AuUInt32 DoTick(AuUInt64, AuList<AuSPtr<ILoopSource>> *optOut = nullptr, bool *tryAgain = nullptr, bool async = false, bool all = false, AnEpoll *pAllEpoll = nullptr);
private: private:
@ -97,15 +101,20 @@ namespace Aurora::IO::Loop
// ticked, should remove // ticked, should remove
AuTuple<bool, bool, bool> DoWork(int fd); AuTuple<bool, bool, bool> DoWork(int fd);
AuTuple<bool, bool, bool> DoWork(bool read, bool write); AuTuple<bool, bool, bool> DoWork(bool read, bool write);
AuTuple<bool, bool, bool> DoWorkLate();
bool CanDispatchLayer(int fd);
bool CanDispatchLayer(bool read, bool write);
}; };
struct AnEpoll struct AnEpoll
{ {
LoopQueue *parent {}; LoopQueue *parent {};
bool bAll {};
AuThreadPrimitives::SpinLock lock; AuThreadPrimitives::SpinLock lock;
AuBST<int, int> startingWorkRead; AuBST<int, int> startingWorkRead;
AuBST<int, int> startingWorkWrite; AuBST<int, int> startingWorkWrite;
AuList<AuSPtr<SourceExtended>> done;
void Add(SourceExtended *source); void Add(SourceExtended *source);
void Remove(SourceExtended *source, bool readData, bool writeData); void Remove(SourceExtended *source, bool readData, bool writeData);