1023 lines
28 KiB
C++
1023 lines
28 KiB
C++
/***
|
|
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
File: LoopQueue.Linux.cpp
|
|
Date: 2022-4-5
|
|
Author: Reece
|
|
***/
|
|
#include <Source/RuntimeInternal.hpp>
|
|
#include "Loop.NT.hpp"
|
|
#include "ILoopSourceEx.hpp"
|
|
#include "LoopQueue.Linux.hpp"
|
|
#include <sys/epoll.h>
|
|
#include <Source/Time/Time.hpp>
|
|
#include <Source/IO/UNIX/IOSubmit.Linux.hpp>
|
|
|
|
#if defined(AURORA_COMPILER_CLANG)
|
|
// warning: ISO C++20 considers use of overloaded operator '!=' (with operand types 'AuSPtr<Aurora::IO::Loop::ILoopSource>' (aka 'ExSharedPtr<Aurora::IO::Loop::ILoopSource, std::shared_ptr<ILoopSource>>') and 'typename tuple_element<0UL, tuple<ExSharedPtr<ILoopSource, shared_ptr<ILoopSource>>, ExSharedPtr<ILoopSourceSubscriber, shared_ptr<ILoopSourceSubscriber>>, ExSharedPtr<ILoopSourceSubscriberEx, shared_ptr<ILoopSourceSubscriberEx>>>>::type' (aka '__type_pack_element<0UL, Aurora::Memory::ExSharedPtr<Aurora::IO::Loop::ILoopSource, std::shared_ptr<Aurora::IO::Loop::ILoopSource>>, Aurora::Memory::ExSharedPtr<Aurora::IO::Loop::ILoopSourceSubscriber, std::shared_ptr<Aurora::IO::Loop::ILoopSourceSubscriber>>, Aurora::Memory::ExSharedPtr<Aurora::IO::Loop::ILoopSourceSubscriberEx, std::shared_ptr<Aurora::IO::Loop::ILoopSourceSubscriberEx>>>')) to be ambiguous despite there being a unique best viable function with non-reversed arguments [-Wambiguous-reversed-operator]
|
|
#pragma clang diagnostic ignored "-Wambiguous-reversed-operator"
|
|
// Yea, I couldn't give less of a nanoshit what some C++20 spec says. Even llvm/clang doesn't care to language police it into a fatal unimplemented compiler condition. So, idc.
|
|
#endif
|
|
|
|
namespace Aurora::IO::Loop
|
|
{
|
|
// On Linux, Loop Queues are glorified eventfd to epoll adapters.
|
|
// Requeuing the cached fd array per frame on the TLS io_submit object
|
|
// would be more costly than maintaining an epoll for all fds tracked
|
|
// by the loop queue.
|
|
// We can delegate the wait functions to an NT overlapped like shim
|
|
// where all eventfds are one epoll handle
|
|
// The TLS worker would get awoken by any changes in the epoll queue
|
|
// or if the io submit object should preemptively abort
|
|
// The TLS worker would remain resposible for scheduling thread local
|
|
// network and file transactions independent from the loop queues
|
|
// As such, loop queues continue to be defined as a mechanism to merely
|
|
// wait, not dispatch/manage work
|
|
// Delegating mutex reads to a single io_submit would be a linux-specific
|
|
// kevent-non-reusable ThreadWorkerQueueShim hack
|
|
// ...it wouldn't make sense create another loop queue per thread concept
|
|
// outside of the async subsystem (not counting TLS overlapped io)
|
|
|
|
LoopQueue::LoopQueue() : lockStealer_(false, false, true)
|
|
{
|
|
|
|
}
|
|
|
|
LoopQueue::~LoopQueue()
|
|
{
|
|
Deinit();
|
|
}
|
|
|
|
bool LoopQueue::Init()
|
|
{
|
|
this->epollFd_ = epoll_create1(EPOLL_CLOEXEC);
|
|
if (this->epollFd_ == -1)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
this->sourceMutex_ = AuThreadPrimitives::RWLockUnique();
|
|
if (!this->sourceMutex_)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
this->polledItemsMutex_ = AuThreadPrimitives::RWLockUnique();
|
|
if (!this->polledItemsMutex_)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
this->globalEpoll_.parent = this;
|
|
|
|
return AuTryInsert(this->alternativeEpolls_, &this->globalEpoll_);
|
|
}
|
|
|
|
void LoopQueue::Deinit()
|
|
{
|
|
int fd;
|
|
if ((fd = AuExchange(this->epollFd_, -1)) != -1)
|
|
{
|
|
::close(fd);
|
|
}
|
|
}
|
|
|
|
void LoopQueue::AnEpoll::Add(SourceExtended *source)
|
|
{
|
|
epoll_event event;
|
|
auto ex = source->sourceExtended;
|
|
|
|
if (!ex)
|
|
{
|
|
return;
|
|
}
|
|
|
|
event.data.ptr = source;
|
|
|
|
if (ex->Singular())
|
|
{
|
|
bool bDouble {};
|
|
int oldReadRef {};
|
|
int oldWriteRef {};
|
|
|
|
auto read = ex->GetHandle();
|
|
if (read != -1)
|
|
{
|
|
oldReadRef = startingWorkRead[read]++;
|
|
bDouble |= startingWorkWrite.find(read) != startingWorkWrite.end();
|
|
}
|
|
|
|
auto write = ex->GetWriteHandle();
|
|
if (write != -1)
|
|
{
|
|
oldWriteRef = startingWorkWrite[write]++;
|
|
bDouble |= startingWorkRead.find(write) != startingWorkRead.end();
|
|
}
|
|
|
|
if (bDouble)
|
|
{
|
|
epoll_event event;
|
|
event.events = EPOLLOUT | EPOLLIN;
|
|
event.data.ptr = source;
|
|
|
|
if ((oldReadRef == 0) && (oldWriteRef == 0))
|
|
{
|
|
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_ADD, write, &event);
|
|
}
|
|
else
|
|
{
|
|
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_MOD, write, &event);
|
|
}
|
|
}
|
|
|
|
if ((write != -1) && (!oldWriteRef))
|
|
{
|
|
event.events = EPOLLOUT;
|
|
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_ADD, write, &event);
|
|
}
|
|
|
|
if ((read != -1) && (!oldReadRef))
|
|
{
|
|
event.events = EPOLLIN;
|
|
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_ADD, read, &event);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
auto read = ex->GetHandles();
|
|
auto write = ex->GetWriteHandles();
|
|
|
|
for (auto readHandle : read)
|
|
{
|
|
auto count = startingWorkRead[readHandle]++;
|
|
|
|
if (count)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
if (AuExists(write, readHandle))
|
|
{
|
|
continue;
|
|
}
|
|
|
|
event.events = EPOLLIN;
|
|
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_ADD, readHandle, &event);
|
|
}
|
|
|
|
for (auto writeHandle : write)
|
|
{
|
|
auto count = startingWorkWrite[writeHandle]++;
|
|
|
|
if (count)
|
|
{
|
|
if (AuExists(read, writeHandle))
|
|
{
|
|
event.events = EPOLLOUT | EPOLLIN;
|
|
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_MOD, writeHandle, &event);
|
|
}
|
|
continue;
|
|
}
|
|
|
|
if (AuExists(read, writeHandle))
|
|
{
|
|
event.events = EPOLLOUT | EPOLLIN;
|
|
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_ADD, writeHandle, &event);
|
|
}
|
|
else
|
|
{
|
|
event.events = EPOLLOUT;
|
|
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_ADD, writeHandle, &event);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
bool LoopQueue::SourceAdd(const AuSPtr<ILoopSource> &source)
|
|
{
|
|
return SourceAddWithTimeout(source, 0);
|
|
}
|
|
|
|
bool LoopQueue::SourceAddWithTimeout(const AuSPtr<ILoopSource> &source, AuUInt32 ms)
|
|
{
|
|
AU_LOCK_GUARD(this->commitQueueMutex_);
|
|
return this->SourceAddWithTimeoutEx(source, ms);
|
|
}
|
|
|
|
bool LoopQueue::SourceAddWithTimeoutEx(const AuSPtr<ILoopSource> &source, AuUInt32 ms)
|
|
{
|
|
this->lockStealer_.Set();
|
|
#if 0
|
|
AU_LOCK_GUARD(this->sourceMutex_->AsWritable());
|
|
#else
|
|
auto pWaitable = this->sourceMutex_->AsWritable();
|
|
auto pLocked = pWaitable->TryLock();
|
|
if (pLocked)
|
|
#endif
|
|
{
|
|
this->lockStealer_.Reset();
|
|
|
|
auto src = AuMakeShared<SourceExtended>(this, source);
|
|
if (!src)
|
|
{
|
|
pWaitable->Unlock();
|
|
return false;
|
|
}
|
|
|
|
if (ms)
|
|
{
|
|
src->timeoutAbs = (AuUInt64)ms + AuTime::SteadyClockMS();
|
|
}
|
|
|
|
if (!AuTryInsert(this->sources_, src))
|
|
{
|
|
pWaitable->Unlock();
|
|
return false;
|
|
}
|
|
|
|
this->globalEpoll_.Add(src.get());
|
|
pWaitable->Unlock();
|
|
return true;
|
|
}
|
|
#if 1
|
|
else
|
|
{
|
|
return AuTryInsert(this->pendingBlocking_, AuMakePair(source, ms));
|
|
}
|
|
#endif
|
|
}
|
|
|
|
bool LoopQueue::SourceRemove(const AuSPtr<ILoopSource> &source)
|
|
{
|
|
AU_LOCK_GUARD(this->commitQueueMutex_);
|
|
return AuTryInsert(this->decommitQueue_, source);
|
|
}
|
|
|
|
AuUInt32 LoopQueue::GetSourceCount()
|
|
{
|
|
return this->sources_.size();
|
|
}
|
|
|
|
bool LoopQueue::AddCallback(const AuSPtr<ILoopSource> &source, const AuSPtr<ILoopSourceSubscriber> &subscriber)
|
|
{
|
|
AU_LOCK_GUARD(this->commitQueueMutex_);
|
|
return AuTryInsert(this->commitPending_, AuMakeTuple(source, subscriber, AuSPtr<ILoopSourceSubscriberEx>{}));
|
|
}
|
|
|
|
bool LoopQueue::AddCallbackEx(const AuSPtr<ILoopSource> &source, const AuSPtr<ILoopSourceSubscriberEx> &subscriber)
|
|
{
|
|
AU_LOCK_GUARD(this->commitQueueMutex_);
|
|
return AuTryInsert(this->commitPending_, AuMakeTuple(source, AuSPtr<ILoopSourceSubscriber>{}, subscriber));
|
|
}
|
|
|
|
bool LoopQueue::AddCallback(const AuSPtr<ILoopSourceSubscriber> &subscriber)
|
|
{
|
|
AU_LOCK_GUARD(this->globalLockMutex_);
|
|
return AuTryInsert(this->allSubscribers_, subscriber);
|
|
}
|
|
|
|
void LoopQueue::ChugPathConfigure(AuUInt32 sectionTickTime, AuSInt sectionDequeCount)
|
|
{
|
|
// Intentionally NO-OP under Linux
|
|
}
|
|
|
|
void LoopQueue::ChugHint(bool value)
|
|
{
|
|
// Intentionally NO-OP under Linux
|
|
}
|
|
|
|
bool LoopQueue::CommitDecommit()
|
|
{
|
|
AuUInt32 dwSuccess {};
|
|
|
|
if (this->decommitQueue_.empty())
|
|
{
|
|
return true;
|
|
}
|
|
|
|
auto decommitQueue = AuExchange(this->decommitQueue_, {});
|
|
|
|
for (auto sourceExtended : sources_)
|
|
{
|
|
bool bFound {};
|
|
for (auto decommit : decommitQueue)
|
|
{
|
|
if (decommit == sourceExtended->source)
|
|
{
|
|
bFound = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!bFound)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
|
|
for (auto epoll : this->alternativeEpolls_)
|
|
{
|
|
epoll->Remove(sourceExtended.get(), true, true);
|
|
}
|
|
|
|
dwSuccess++;
|
|
}
|
|
|
|
// TODO (Reece): Urgent. Fails under an IO update dtor. Faking perfect unit tests until i make it. Need linux aurt.
|
|
//SysAssertDbg(dwSuccess == decommitQueue.size(), "caught SourceRemove on invalid");
|
|
|
|
return dwSuccess;
|
|
}
|
|
|
|
bool LoopQueue::Commit()
|
|
{
|
|
AU_LOCK_GUARD(this->commitQueueMutex_);
|
|
|
|
for (const auto & [pSource, ms] : AuExchange(this->pendingBlocking_, {}))
|
|
{
|
|
this->SourceAddWithTimeoutEx(pSource, ms);
|
|
}
|
|
|
|
this->lockStealer_.Set();
|
|
auto pWritable = this->sourceMutex_->AsWritable();
|
|
if (pWritable->TryLock())
|
|
{
|
|
this->lockStealer_.Reset();
|
|
|
|
if (!CommitDecommit())
|
|
{
|
|
//pWritable->Unlock();
|
|
//return false;
|
|
}
|
|
|
|
auto pending = AuExchange(this->commitPending_, {});
|
|
|
|
for (auto &source : this->sources_)
|
|
{
|
|
for (auto itr = pending.begin(); itr != pending.end(); )
|
|
{
|
|
if (source->source != AuGet<0>(*itr))
|
|
{
|
|
itr ++;
|
|
continue;
|
|
}
|
|
|
|
auto a = AuGet<1>(*itr);
|
|
if (a)
|
|
{
|
|
if (!AuTryInsert(source->subscribers, a))
|
|
{
|
|
this->commitPending_ = AuMove(this->commitPending_);
|
|
pWritable->Unlock();
|
|
return false;
|
|
}
|
|
}
|
|
|
|
auto b = AuGet<2>(*itr);
|
|
if (b)
|
|
{
|
|
if (!AuTryInsert(source->subscriberExs, b))
|
|
{
|
|
// 1 and 2 are mutually exclusive, dont worry about clean up
|
|
this->commitPending_ = AuMove(this->commitPending_);
|
|
pWritable->Unlock();
|
|
return false;
|
|
}
|
|
}
|
|
|
|
itr = pending.erase(itr);
|
|
}
|
|
|
|
source->Commit(source);
|
|
}
|
|
pWritable->Unlock();
|
|
}
|
|
else
|
|
{
|
|
this->bRecommitLater = true;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool LoopQueue::IsSignaledPeek()
|
|
{
|
|
fd_set readSet;
|
|
struct timeval tv {};
|
|
|
|
FD_ZERO(&readSet);
|
|
FD_SET(this->epollFd_, &readSet);
|
|
|
|
auto active = select(this->epollFd_ + 1, &readSet, NULL, NULL, &tv);
|
|
if (active == -1)
|
|
{
|
|
// todo push error
|
|
return false;
|
|
}
|
|
|
|
return active == 1;
|
|
}
|
|
|
|
// This could be implemented more like a possible BSD implementation
|
|
// if we were to implement based on io_submit poll
|
|
bool LoopQueue::WaitAll(AuUInt32 timeoutIn)
|
|
{
|
|
AnEpoll epollReference;
|
|
{
|
|
AU_LOCK_GUARD(this->globalEpoll_.lock);
|
|
epollReference = this->globalEpoll_;
|
|
}
|
|
epollReference.lock = {};
|
|
|
|
AuUInt64 timeout {timeoutIn};
|
|
if (timeout)
|
|
{
|
|
timeout += AuTime::SteadyClockMS();
|
|
}
|
|
|
|
{
|
|
AU_LOCK_GUARD(this->polledItemsMutex_->AsWritable());
|
|
AuTryInsert(this->alternativeEpolls_, &epollReference);
|
|
}
|
|
|
|
bool anythingLeft {};
|
|
bool bTimeout {};
|
|
do
|
|
{
|
|
anythingLeft = epollReference.startingWorkRead.size() || epollReference.startingWorkWrite.size();
|
|
if (!anythingLeft) return true;
|
|
|
|
//WaitAny(0);
|
|
// [==========] 1 test from 1 test suite ran. (11100 ms total)
|
|
// ...and a turbojet
|
|
|
|
//bool bTryAgain {};
|
|
//DoTick(timeout, {}, &bTryAgain);
|
|
// ...and + ~10ms latency
|
|
|
|
//bool bTryAgain {};
|
|
//DoTick(AuMin(AuUInt64(AuTime::CurrentClockMS() + 4), timeout), {}, &bTryAgain);
|
|
// [----------] 1 test from Loop (11101 ms total)
|
|
// ...and no jet engine (+ lower latency than windows)
|
|
|
|
|
|
bool bTryAgain {};
|
|
DoTick(timeout, {}, &bTryAgain);
|
|
PumpHooks();
|
|
// but this hack should apply to wait any as well, so i'm moving it to the DoTick function
|
|
|
|
anythingLeft = epollReference.startingWorkRead.size() || epollReference.startingWorkWrite.size();
|
|
bTimeout = timeout ? AuTime::SteadyClockMS() >= timeout : false;
|
|
} while (anythingLeft && !bTimeout);
|
|
|
|
{
|
|
AU_LOCK_GUARD(this->polledItemsMutex_->AsWritable());
|
|
SysAssert(AuTryRemove(this->alternativeEpolls_, &epollReference));
|
|
}
|
|
|
|
return !anythingLeft;
|
|
}
|
|
|
|
AuUInt32 LoopQueue::WaitAny(AuUInt32 timeoutIn)
|
|
{
|
|
AuUInt64 timeout = timeoutIn;
|
|
|
|
if (timeout)
|
|
{
|
|
timeout += AuTime::SteadyClockMS();
|
|
}
|
|
|
|
AuUInt32 cTicked {};
|
|
bool bTryAgain {};
|
|
do
|
|
{
|
|
bTryAgain = false;
|
|
AuUInt32 ticked = DoTick(timeout, {}, &bTryAgain);
|
|
PumpHooks();
|
|
cTicked += ticked;
|
|
} while (bTryAgain);
|
|
|
|
return cTicked;
|
|
}
|
|
|
|
AuUInt32 LoopQueue::PumpNonblocking()
|
|
{
|
|
AuUInt32 cTicked {};
|
|
bool bTryAgain {};
|
|
|
|
do
|
|
{
|
|
bTryAgain = false;
|
|
AuUInt32 ticked = DoTick(0, {}, &bTryAgain, true);
|
|
PumpHooks();
|
|
cTicked += ticked;
|
|
} while (bTryAgain);
|
|
|
|
return cTicked;
|
|
}
|
|
|
|
AuList<AuSPtr<ILoopSource>> LoopQueue::PumpNonblockingEx()
|
|
{
|
|
AuList<AuSPtr<ILoopSource>> ret;
|
|
bool bTryAgain {};
|
|
|
|
do
|
|
{
|
|
bTryAgain = false;
|
|
AuUInt32 ticked = DoTick(0, &ret, &bTryAgain, true);
|
|
PumpHooks();
|
|
} while (bTryAgain);
|
|
|
|
return ret;
|
|
}
|
|
|
|
AuList<AuSPtr<ILoopSource>> LoopQueue::WaitAnyEx(AuUInt32 timeoutIn)
|
|
{
|
|
AuList<AuSPtr<ILoopSource>> ret;
|
|
AuUInt64 timeout = timeoutIn;
|
|
|
|
if (timeout)
|
|
{
|
|
timeout += AuTime::SteadyClockMS();
|
|
}
|
|
|
|
bool bTryAgain {};
|
|
do
|
|
{
|
|
bTryAgain = false;
|
|
AuUInt32 ticked = DoTick(timeout, &ret, &bTryAgain);
|
|
PumpHooks();
|
|
} while (bTryAgain);
|
|
|
|
return ret;
|
|
}
|
|
|
|
void LoopQueue::AnEpoll::Remove(SourceExtended *source, bool readData, bool writeData)
|
|
{
|
|
if (!source->sourceExtended)
|
|
{
|
|
return;
|
|
}
|
|
|
|
auto ex = source->sourceExtended;
|
|
AU_LOCK_GUARD(this->lock);
|
|
bool bIsRoot = this == &this->parent->globalEpoll_;
|
|
if (readData)
|
|
{
|
|
for (auto i = startingWorkRead.begin(); i != startingWorkRead.end(); )
|
|
{
|
|
bool doesntMatch {};
|
|
|
|
auto &fd = i->first;
|
|
auto &usage = i->second;
|
|
|
|
if (ex->Singular())
|
|
{
|
|
doesntMatch = fd != ex->GetHandle();
|
|
}
|
|
else
|
|
{
|
|
doesntMatch = !AuExists(ex->GetHandles(), fd);
|
|
}
|
|
|
|
if (doesntMatch)
|
|
{
|
|
i++;
|
|
continue;
|
|
}
|
|
|
|
if ((--(usage)) != 0)
|
|
{
|
|
i++;
|
|
continue;
|
|
}
|
|
|
|
if (bIsRoot)
|
|
{
|
|
if (startingWorkWrite.find(fd) == startingWorkWrite.end())
|
|
{
|
|
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_DEL, fd, nullptr);
|
|
}
|
|
else
|
|
{
|
|
epoll_event event;
|
|
event.events = EPOLLOUT;
|
|
event.data.ptr = source;
|
|
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_MOD, fd, &event);
|
|
}
|
|
}
|
|
|
|
i = startingWorkRead.erase(i);
|
|
}
|
|
}
|
|
|
|
if (writeData)
|
|
{
|
|
for (auto i = startingWorkWrite.begin(); i != startingWorkWrite.end(); )
|
|
{
|
|
bool doesntMatch {};
|
|
|
|
auto &fd = i->first;
|
|
auto &usage = i->second;
|
|
|
|
if (ex->Singular())
|
|
{
|
|
doesntMatch = fd != ex->GetWriteHandle();
|
|
}
|
|
else
|
|
{
|
|
doesntMatch = !AuExists(ex->GetWriteHandles(), fd);
|
|
}
|
|
|
|
if (doesntMatch )
|
|
{
|
|
i++;
|
|
continue;
|
|
}
|
|
|
|
if ((--(usage)) != 0)
|
|
{
|
|
i++;
|
|
continue;
|
|
}
|
|
|
|
if (bIsRoot)
|
|
{
|
|
if (startingWorkRead.find(fd) == startingWorkRead.end())
|
|
{
|
|
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_DEL, fd, nullptr);
|
|
}
|
|
else
|
|
{
|
|
epoll_event event;
|
|
event.events = EPOLLIN;
|
|
event.data.ptr = source;
|
|
epoll_ctl(this->parent->epollFd_, EPOLL_CTL_MOD, fd, &event);
|
|
}
|
|
}
|
|
|
|
i = startingWorkWrite.erase(i);
|
|
}
|
|
}
|
|
}
|
|
|
|
AuUInt32 LoopQueue::DoTick(AuUInt64 time, AuList<AuSPtr<ILoopSource>> *optOut, bool *tryAgain, bool nonblock)
|
|
{
|
|
AuUInt32 bTicked {};
|
|
AuUInt64 now {};
|
|
epoll_event events[128];
|
|
|
|
AU_LOCK_GUARD(this->sourceMutex_->AsReadable());
|
|
|
|
for (const auto & source : this->sources_)
|
|
{
|
|
if (source->sourceExtended)
|
|
{
|
|
source->sourceExtended->OnPresleep();
|
|
}
|
|
}
|
|
|
|
AuInt64 deltaMS = 0;
|
|
if (time)
|
|
{
|
|
deltaMS = AuMin(AuInt64(4), (AuInt64)time - (AuInt64)AuTime::SteadyClockMS());
|
|
if (deltaMS < 0)
|
|
{
|
|
deltaMS = 0;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
deltaMS = nonblock ? 0 : -1;
|
|
}
|
|
|
|
int iEvents = IO::UNIX::LinuxOverlappedEpollShim(this->epollFd_, events, AuArraySize(events), deltaMS);
|
|
if (iEvents == -1)
|
|
{
|
|
goto out;
|
|
}
|
|
|
|
for (int i = 0; i < iEvents; i++)
|
|
{
|
|
bool readData = events[i].events & EPOLLIN;
|
|
bool writeData = events[i].events & EPOLLOUT;
|
|
|
|
auto handle = events[i].data.ptr;
|
|
if (!handle)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
auto base = AuReinterpretCast<SourceExtended *>(handle);
|
|
if (!base->bHasCommited)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
auto source = base->pin.lock();
|
|
|
|
auto [ticked, remove, noworkers] = source->DoWork(readData, writeData);
|
|
bTicked += ticked;
|
|
|
|
if (ticked)
|
|
{
|
|
if (optOut)
|
|
{
|
|
optOut->push_back(source->source);
|
|
}
|
|
}
|
|
|
|
if (remove)
|
|
{
|
|
this->sourceMutex_->UpgradeReadToWrite(0);
|
|
AuTryRemove(this->sources_, source);
|
|
this->sourceMutex_->DowngradeWriteToRead();
|
|
}
|
|
|
|
if (remove)
|
|
{
|
|
AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
|
|
for (auto epoll : this->alternativeEpolls_)
|
|
{
|
|
epoll->Remove(source.get(), readData, writeData);
|
|
}
|
|
}
|
|
|
|
// Fire waitall
|
|
// not sure i like how this fires all anys and alls.
|
|
// this isnt consistent
|
|
if (noworkers)
|
|
{
|
|
AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
|
|
for (auto epoll : this->alternativeEpolls_)
|
|
{
|
|
if (epoll != &this->globalEpoll_)
|
|
{
|
|
epoll->Remove(source.get(), readData, writeData);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
now = AuTime::SteadyClockMS();
|
|
|
|
if (!bTicked)
|
|
{
|
|
if (tryAgain)
|
|
{
|
|
*tryAgain = ((this->lockStealer_.IsSignaled()) ||
|
|
(now < time));
|
|
}
|
|
}
|
|
|
|
out:
|
|
|
|
if (!now)
|
|
{
|
|
now = AuTime::SteadyClockMS();
|
|
}
|
|
|
|
for (auto itr = this->sources_.begin(); itr != this->sources_.end(); )
|
|
{
|
|
AuSPtr<SourceExtended> source = *itr;
|
|
bool remove {};
|
|
|
|
if (!remove)
|
|
{
|
|
remove = source->ConsiderTimeout(now);
|
|
}
|
|
|
|
if (remove)
|
|
{
|
|
this->sourceMutex_->UpgradeReadToWrite(0);
|
|
itr = this->sources_.erase(itr);
|
|
this->sourceMutex_->DowngradeWriteToRead();
|
|
}
|
|
|
|
if (remove)
|
|
{
|
|
AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
|
|
for (auto epoll : this->alternativeEpolls_)
|
|
{
|
|
epoll->Remove(source.get(), true, true);
|
|
}
|
|
}
|
|
|
|
if (source->sourceExtended)
|
|
{
|
|
source->sourceExtended->OnFinishSleep();
|
|
}
|
|
|
|
if (!remove)
|
|
{
|
|
itr ++;
|
|
}
|
|
}
|
|
|
|
return bTicked;
|
|
}
|
|
|
|
|
|
LoopQueue::SourceExtended::SourceExtended(LoopQueue *parent, const AuSPtr<ILoopSource> &source) :
|
|
parent(parent),
|
|
source(source)
|
|
{
|
|
this->sourceExtended = AuDynamicCast<ILoopSourceEx>(source.get());
|
|
}
|
|
|
|
LoopQueue::SourceExtended::~SourceExtended()
|
|
{
|
|
Deinit();
|
|
}
|
|
|
|
void LoopQueue::SourceExtended::Deinit()
|
|
{
|
|
this->pin.reset();
|
|
}
|
|
|
|
void LoopQueue::SourceExtended::Commit(const AuSPtr<SourceExtended> &self)
|
|
{
|
|
this->pin = self;
|
|
this->bHasCommited = true;
|
|
}
|
|
|
|
AuTuple<bool, bool, bool> LoopQueue::SourceExtended::DoWork(bool read, bool write)
|
|
{
|
|
if (!this->sourceExtended)
|
|
{
|
|
return DoWork(-1);
|
|
}
|
|
|
|
if (this->sourceExtended->Singular())
|
|
{
|
|
AuPair<bool, bool> ret;
|
|
|
|
bool bSingleOnlyFlag {true};
|
|
|
|
if (read)
|
|
{
|
|
auto [a, b, c] = DoWork(this->sourceExtended->GetHandle());
|
|
ret.first |= a;
|
|
ret.second |= b;
|
|
bSingleOnlyFlag &= c;
|
|
}
|
|
|
|
if (write)
|
|
{
|
|
auto [a, b, c] = DoWork(this->sourceExtended->GetWriteHandle());
|
|
ret.first |= a;
|
|
ret.second |= b;
|
|
bSingleOnlyFlag &= c;
|
|
}
|
|
|
|
return AuMakeTuple(AuGet<0>(ret), AuGet<1>(ret), bSingleOnlyFlag);
|
|
}
|
|
else
|
|
{
|
|
// Whatever, I doubt implementing this is worth the perf hit
|
|
return DoWork(-1);
|
|
}
|
|
}
|
|
|
|
AuTuple<bool, bool, bool> LoopQueue::SourceExtended::DoWork(int fd)
|
|
{
|
|
bool bShouldRemove {true};
|
|
AuUInt8 uPosition {};
|
|
|
|
if (!this->bHasCommited)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
if (this->sourceExtended)
|
|
{
|
|
if (!this->sourceExtended->OnTrigger(fd))
|
|
{
|
|
return {};
|
|
}
|
|
}
|
|
|
|
bool bOverload {};
|
|
|
|
if ((this->subscribers.empty()) &&
|
|
(this->subscriberExs.empty()))
|
|
{
|
|
bOverload = true;
|
|
}
|
|
|
|
// Notify callbacks...
|
|
|
|
for (auto itr = this->subscriberExs.begin();
|
|
itr != this->subscriberExs.end(); )
|
|
{
|
|
bool result;
|
|
auto handler = *itr;
|
|
|
|
try
|
|
{
|
|
result = handler->OnFinished(this->source, uPosition++);
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
|
|
bShouldRemove &= result;
|
|
|
|
if (result)
|
|
{
|
|
itr = this->subscriberExs.erase(itr);
|
|
}
|
|
else
|
|
{
|
|
itr++;
|
|
}
|
|
}
|
|
|
|
for (auto itr = this->subscribers.begin();
|
|
itr != this->subscribers.end(); )
|
|
{
|
|
bool result;
|
|
auto handler = *itr;
|
|
|
|
try
|
|
{
|
|
result = handler->OnFinished(this->source);
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
|
|
bShouldRemove &= result;
|
|
|
|
if (result)
|
|
{
|
|
itr = this->subscribers.erase(itr);
|
|
}
|
|
else
|
|
{
|
|
itr++;
|
|
}
|
|
}
|
|
|
|
// Evict when subs count hit zero, not when sub count started off at zero
|
|
if (bOverload)
|
|
{
|
|
bShouldRemove = false;
|
|
}
|
|
|
|
// Notify global subscribers, allowing them to preempt removal
|
|
if (bShouldRemove || bOverload)
|
|
{
|
|
AU_LOCK_GUARD(this->parent->globalLockMutex_);
|
|
for (const auto &handler : this->parent->allSubscribers_)
|
|
{
|
|
try
|
|
{
|
|
bShouldRemove &= handler->OnFinished(this->source);
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
}
|
|
}
|
|
return AuMakeTuple(true, bShouldRemove, bOverload);
|
|
}
|
|
|
|
bool LoopQueue::AddHook(const AuFunction<void()> &func)
|
|
{
|
|
return AuTryInsert(this->epilogueHooks_, func);
|
|
}
|
|
|
|
void LoopQueue::PumpHooks()
|
|
{
|
|
if (AuExchange(this->bRecommitLater, false))
|
|
{
|
|
this->Commit();
|
|
}
|
|
|
|
auto c = AuExchange(this->epilogueHooks_, {});
|
|
for (auto &a : c)
|
|
{
|
|
a();
|
|
}
|
|
}
|
|
|
|
AUKN_SYM AuSPtr<ILoopQueue> NewLoopQueue()
|
|
{
|
|
auto queue = AuMakeShared<LoopQueue>();
|
|
if (!queue)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
if (!queue->Init())
|
|
{
|
|
return {};
|
|
}
|
|
|
|
return queue;
|
|
}
|
|
} |