[*] Gave the NT loop some attention (much more required)

This commit is contained in:
Reece Wilson 2022-03-30 18:56:56 +01:00
parent f9ac6fff72
commit 79c2a2ffff
16 changed files with 530 additions and 295 deletions

View File

@ -91,18 +91,10 @@ namespace Aurora::Loop
*/
virtual bool AddCallback(const AuSPtr<ILoopSourceSubscriber> &subscriber) = 0;
/**
* @brief Indicates whether or not all the loop sources are in a signaled state and all
* relevant interfaces, such as internal IPC handlers and ILoopSourceSubscribers,
* have received the update. Merely waiting for all loop sources to finish is not
* sufficient. One must process the signaled states by calling a Wait function
* when the HasAnyPending() condition is valid.
* @return
*/
virtual bool HasFinished() = 0;
/**
* @brief Nonblocking wait-any for all objects in the loop queue
* @warning (may yield to ILoopSourceSubscriber delegate on the current context)
* @return
*/
virtual bool IsSignaled() = 0;
@ -110,8 +102,8 @@ namespace Aurora::Loop
/**
* @brief Waits on all the submitted loop sources until they are all complete or until the timeout has finished.
* Note: the completion of another Wait[All/Any[Ex]] call may result in a
* @param timeout
* @return
* @warning (may yield to ILoopSourceSubscriber delegate on the current context)
*/
virtual bool WaitAll (AuUInt32 timeout = 0) = 0;
@ -119,6 +111,7 @@ namespace Aurora::Loop
* @brief Waits on all the loop sources until at least one is signaled
* @param timeout
* @return
* @warning (may yield to ILoopSourceSubscriber delegate on the current context)
*/
virtual AuUInt32 WaitAny (AuUInt32 timeout = 0) = 0;
@ -126,20 +119,21 @@ namespace Aurora::Loop
* @brief
* @param timeout
* @return
* @warning (may yield to ILoopSourceSubscriber delegate on the current context)
*/
virtual AuList<AuSPtr<ILoopSource>> WaitAnyEx(AuUInt32 timeout = 0) = 0;
/**
* @brief
* Mostly relevant on Windows where the limit is 64 versus horrible BSD-y socket
* optimized interfaces that can handle thousands of fds
* Mostly relevant on Windows where the limit is 64 versus horrible UNIX-y socket
* optimized interfaces that can handle thousands of fds
*
* In large loop queues, a maximum of
* `sectionTickTime * (sectionDequeCount + clamp(platformSanity, sectionDequeCount))`
* kernel objects can be checked within a given time frame.
*
* What this effectively does is iterate over a subdivided work queue, aligned
* to platformSanity, [i, i+platform sanity] objects can be safely waitany on.
* to platformSanity (NT: 64); [i, i+platform sanity] objects can then be waited on.
*
* On timeout of sectionTickTime,
* [i, i+sectionDequeCount] of the tick could be checked, however, we know
@ -148,16 +142,14 @@ namespace Aurora::Loop
* we've already spent the maximum allocated time to poll on a mere subset,
* we may as well desperately check sectionDequeCount worth of objects, and
* write-off the time spent blocking. That way, we get upto platform sanity
* of is signaled checks for free in the worst case scenario. The perspective
* that the overshoot is expensive doesn't make sense, when the is signaled
* check is basically free and the timeout on a subset of the req was the
* bottleneck.
* of is-signaled checks for free in the worst case scenario. The perspective
* that the overshoot is expensive doesn't make sense. The is-signaled
* check is basically free and the initial timeout/yield-to-kern over set 0
* was the bottleneck
*
* Otherwise on success, we know to check in the range of [i, platform sanity
* for at least one alerted object.
*
* Looking anywhere else given any help from a waitany interface would be stupid.
*
* Internal logic will dequeue platform coefficient aligned handles, and the timeout
* optimization shall check [i+platform sanity, i+platform sanity+sectionDequeCount].
* 1 handle =/= 1 loop source, tho usually 1 loop source ~= 1 handle

View File

@ -14,6 +14,8 @@ namespace Aurora::Loop
virtual void OnPresleep() = 0;
virtual bool OnTrigger(AuUInt handle) = 0;
virtual void OnFinishSleep() = 0;
virtual bool Singular() = 0;
virtual AuUInt GetHandle() = 0;
virtual AuList<AuUInt> GetHandles() = 0;
};
}

View File

@ -11,10 +11,10 @@
namespace Aurora::Loop
{
class AsyncWaiter : public Event
class AsyncWaiter : public LSEvent
{
public:
AsyncWaiter() : Event(false, false, true)
AsyncWaiter() : LSEvent(false, false, true)
{}
bool IsSignaled() override;
@ -23,7 +23,7 @@ namespace Aurora::Loop
bool AsyncWaiter::IsSignaled()
{
return Event::IsSignaled();
return LSEvent::IsSignaled();
}
ELoopSource AsyncWaiter::GetType()
@ -35,7 +35,7 @@ namespace Aurora::Loop
{
AuSPtr<ILSEvent> ret;
if (!(ret = AuStaticCast<Event>(AuMakeShared<AsyncWaiter>())))
if (!(ret = AuStaticCast<LSEvent>(AuMakeShared<AsyncWaiter>())))
{
return {};
}

View File

@ -10,36 +10,36 @@
namespace Aurora::Loop
{
Event::Event(bool triggered, bool atomicRelease, bool permitMultipleTriggers) : LSHandle((AuUInt)::CreateEventW(NULL, !atomicRelease, triggered, NULL))
LSEvent::LSEvent(bool triggered, bool atomicRelease, bool permitMultipleTriggers) : LSHandle((AuUInt)::CreateEventW(NULL, !atomicRelease, triggered, NULL))
{
}
bool Event::Set()
bool LSEvent::Set()
{
return SetEvent(reinterpret_cast<HANDLE>(this->handle));
}
bool Event::Reset()
bool LSEvent::Reset()
{
return ResetEvent(reinterpret_cast<HANDLE>(this->handle));
}
bool Event::IsSignaled()
bool LSEvent::IsSignaled()
{
return LSHandle::IsSignaled();
}
ELoopSource Event::GetType()
ELoopSource LSEvent::GetType()
{
return ELoopSource::eSourceEvent;
}
AUKN_SYM AuSPtr<ILSEvent> NewLSEvent(bool triggerd, bool atomicRelease, bool permitMultipleTriggers)
{
AuSPtr<Event> ret;
AuSPtr<LSEvent> ret;
if (!(ret = AuMakeShared<Event>(triggerd, atomicRelease, permitMultipleTriggers)))
if (!(ret = AuMakeShared<LSEvent>(triggerd, atomicRelease, permitMultipleTriggers)))
{
return {};
}

View File

@ -10,10 +10,10 @@
namespace Aurora::Loop
{
struct Event : public ILSEvent, public LSHandle
struct LSEvent : public ILSEvent, public LSHandle
{
public:
Event(bool triggered, bool atomicRelease, bool permitMultipleTriggers);
LSEvent(bool triggered, bool atomicRelease, bool permitMultipleTriggers);
bool Set() override;
bool Reset() override;

View File

@ -10,7 +10,7 @@
namespace Aurora::Loop
{
LSHandle::LSHandle(AuUInt handle) : handle(handle), reference({handle})
LSHandle::LSHandle(AuUInt handle) : handle(handle)
{}
bool LSHandle::OnTrigger(AuUInt handle)
@ -20,9 +20,19 @@ namespace Aurora::Loop
AuList<AuUInt> LSHandle::GetHandles()
{
return reference;
return {};
}
AuUInt LSHandle::GetHandle()
{
return this->handle;
}
bool LSHandle::Singular()
{
return true;
}
ELoopSource LSHandle::GetType()
{
return ELoopSource::eSourceHandle;

View File

@ -17,6 +17,9 @@ namespace Aurora::Loop
virtual bool OnTrigger(AuUInt handle) override;
virtual AuList<AuUInt> GetHandles() override;
virtual AuUInt GetHandle() override;
virtual bool Singular() override;
#if defined(AURORA_IS_POSIX_DERIVED)
virtual AuList<AuUInt> GetWriteHandles() override;
virtual AuList<AuUInt> GetSocketHandles() override;
@ -28,7 +31,5 @@ namespace Aurora::Loop
protected:
AuUInt handle;
private:
AuList<AuUInt> reference;
};
}

View File

@ -10,25 +10,25 @@
namespace Aurora::Loop
{
bool Semaphore::AddOne()
bool LSSemaphore::AddOne()
{
LONG atomicOld;
return ReleaseSemaphore(reinterpret_cast<HANDLE>(handle), 1, &atomicOld);
}
bool Semaphore::IsSignaled()
bool LSSemaphore::IsSignaled()
{
return LSHandle::IsSignaled();
}
ELoopSource Semaphore::GetType()
ELoopSource LSSemaphore::GetType()
{
return ELoopSource::eSourceSemaphore;
}
AUKN_SYM AuSPtr<ILSSemaphore> NewLSSemaphore(AuUInt32 initialCount)
{
AuSPtr<Semaphore> ret;
AuSPtr<LSSemaphore> ret;
auto mutex = ::CreateSemaphoreA(NULL, initialCount, AuNumericLimits<LONG>::max(), NULL);
if (mutex == INVALID_HANDLE_VALUE)
@ -37,6 +37,6 @@ namespace Aurora::Loop
return {};
}
return AuMakeShared<Semaphore>(mutex);
return AuMakeShared<LSSemaphore>(mutex);
}
}

View File

@ -10,10 +10,10 @@
namespace Aurora::Loop
{
class Semaphore : public ILSSemaphore, public LSHandle
class LSSemaphore : public ILSSemaphore, public LSHandle
{
public:
Semaphore(HANDLE handle) : LSHandle(reinterpret_cast<AuUInt>(handle))
LSSemaphore(HANDLE handle) : LSHandle(reinterpret_cast<AuUInt>(handle))
{}
bool AddOne() override;

View File

@ -44,10 +44,16 @@ namespace Aurora::Loop
if (auto extended = AuDynamicCast<ILoopSourceEx>(source))
{
loopSourceExs.push_back(extended);
for (const auto &handle : extended->GetHandles())
if (extended->Singular())
{
auto nthandle = reinterpret_cast<HANDLE>(handle);
handleArray.push_back(nthandle);
handleArray.push_back(reinterpret_cast<HANDLE>(extended->GetHandle()));
}
else
{
for (const auto &handle : extended->GetHandles())
{
handleArray.push_back(reinterpret_cast<HANDLE>(handle));
}
}
}
}

View File

@ -37,7 +37,7 @@ namespace Aurora::Loop
bool LoopQueue::IsValid()
{
return this->hEvent_ != INVALID_HANDLE_VALUE && this->handleArray_.size();
return this->hEvent_ != INVALID_HANDLE_VALUE && this->handleArrayOr_.size() && this->handleArrayAnd_.size();
}
struct LoopJoinable
@ -78,7 +78,7 @@ namespace Aurora::Loop
{
AU_LOCK_GUARD(this->sourceMutex_);
if (!AuTryInsert(this->sources_, AuMakeTuple(source, 0)))
if (!AuTryInsert(this->addedSources_, AuMakeTuple(source, 0, SourceCallbacks {})))
{
return false;
}
@ -90,7 +90,7 @@ namespace Aurora::Loop
{
AU_LOCK_GUARD(this->sourceMutex_);
if (!AuTryInsert(this->sources_, AuMakeTuple(source, AuTime::CurrentInternalClockMS() + maxTimeout)))
if (!AuTryInsert(this->addedSources_, AuMakeTuple(source, AuTime::CurrentInternalClockMS() + maxTimeout, SourceCallbacks {})))
{
return false;
}
@ -101,88 +101,121 @@ namespace Aurora::Loop
bool LoopQueue::SourceRemove(const AuSPtr<ILoopSource> &source)
{
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->removedSources_, source);
return AuTryInsert(this->removedSources_, source) || AuTryRemoveByTupleN<0>(this->addedSources_, source);
}
void LoopQueue::RemoveSourceNB(const AuSPtr<ILoopSource> &source, bool removeHandles)
bool LoopQueue::RemoveSourceNB(const AuSPtr<ILoopSource> &source)
{
AU_LOCK_GUARD(this->sourceMutex_); // me
AuTryRemoveByTupleN<0>(this->sources_, source); // guarded by ^
AuRemoveIf(this->loopSourceExs_, [source](const auto &re)
{
return re.sourceBase == source;
}); // guarded by the loop joinable
if (source->GetType() == ELoopSource::eSourceWin32)
{
this->msgSource_.reset();
this->msgCallbacks_ = {};
this->bIsWinLoop_ = false;
return;
return {};
}
if (!removeHandles)
Iterator queueIterator(this);
bool bRebuildFromAnd {};
for ( queueIterator.Start(); queueIterator.End() != queueIterator.itr; )
{
return;
}
AuUInt handleIndex {};
for (const auto &sourceEx : this->loopSourceExs_)
{
AuUInt lastHandle {};
bool wasTriggered {};
auto handles = sourceEx.source->GetHandles();
auto lsStartIndex = handleIndex;
auto delta = handles.size();
if ((handleIndex % 64) == 0)
if (queueIterator.itr->sourceBase == source)
{
handleIndex++;
if (queueIterator.itr->source)
{
auto count = queueIterator.itr->source->Singular() ? 1 : queueIterator.itr->source->GetHandles().size();
if (this->handleArrayOr_.size() <= 64)
{
AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr, count);
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count);
}
else
{
bRebuildFromAnd = true;
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count);
}
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
break;
}
else
{
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
break;
}
}
handleIndex += delta;
if (sourceEx.source == source)
{
AuRemoveRange(this->handleArray_, lsStartIndex, delta);
}
queueIterator.Next();
}
return bRebuildFromAnd;
}
bool LoopQueue::Commit()
{
if (this->isCommitableInFuture_)
{
this->willCommitInFuture_ = true;
return true;
}
LoopJoinable joinable {this};
AU_LOCK_GUARD(joinable);
try
return CommitLocked();
}
void LoopQueue::BuildFromAndArray()
{
this->handleArrayOr_.clear();
for (int i = 0; i < this->handleArrayAnd_.size(); i++)
{
for (const auto &re : this->removedSources_)
if (((this->handleArrayOr_.size() % 64)) == 0)
{
RemoveSourceNB(re, this->sources_.empty());
removedSources_.clear();
this->handleArrayOr_.push_back(this->hEvent_);
}
if (this->sources_.empty() && this->removedSources_.size())
this->handleArrayOr_.push_back(this->handleArrayAnd_[i]);
}
}
bool LoopQueue::CommitLocked()
{
try
{
bool bShouldRebuild {};
for (const auto &re : this->removedSources_)
{
return true;
bShouldRebuild |= RemoveSourceNB(re);
}
removedSources_.clear();
//if (this->sources_.empty() && this->removedSources_.size())
//{
// return true;
//}
// Reset relevant OS cache
this->bIsWinLoop_ = false;
this->bIsThreadSafe_ = false;
AuTryClear(this->loopSourceExs_);
AuTryClear(this->handleArray_);
//AuTryClear(this->loopSourceExs_); // fuck we're losing shit
//AuTryClear(this->handleArray_);
// Reserve the cache arrays for initialization
this->loopSourceExs_.reserve(this->sources_.size());
this->handleArray_.reserve(this->sources_.size());
this->loopSourceExs_.reserve(this->loopSourceExs_.size() + this->addedSources_.size());
this->handleArrayAnd_.reserve(this->loopSourceExs_.size() + this->addedSources_.size());
this->handleArrayOr_.reserve(this->loopSourceExs_.size() + this->addedSources_.size());
bShouldRebuild |= this->addedSources_.size();
//
for (const auto &[source, timeout] : sources_)
for (auto &[source, timeout, callbacks] : this->addedSources_)
{
// Filter bad sources
if (!source)
@ -195,6 +228,7 @@ namespace Aurora::Loop
{
this->bIsWinLoop_ = true;
this->msgSource_ = source;
this->msgCallbacks_ = AuMove(callbacks);
continue;
}
@ -205,31 +239,37 @@ namespace Aurora::Loop
t.timeoutAbs = timeout;
t.sourceBase = source;
t.source = extended;
t.callbacks = AuMove(callbacks);
this->loopSourceExs_.push_back(t);
if ((this->handleArray_.size() % MAXIMUM_WAIT_OBJECTS) == 0)
if (extended->Singular())
{
if (this->hEvent_ != INVALID_HANDLE_VALUE)
const auto handle = extended->GetHandle();
auto nthandle = reinterpret_cast<HANDLE>(handle);
this->handleArrayAnd_.push_back(nthandle);
}
else
{
for (const auto &handle : extended->GetHandles())
{
this->bIsThreadSafe_ = true;
this->handleArray_.push_back(hEvent_);
auto nthandle = reinterpret_cast<HANDLE>(handle);
this->handleArrayAnd_.push_back(nthandle);
}
}
for (const auto &handle : extended->GetHandles())
{
auto nthandle = reinterpret_cast<HANDLE>(handle);
this->handleArray_.push_back(nthandle);
}
}
}
AuTryClear(this->addedSources_);
if (bShouldRebuild)
{
BuildFromAndArray();
}
return true;
}
catch (...)
{
AuTryClear(this->loopSourceExs_);
AuTryClear(this->handleArray_);
return {};
}
}
@ -269,7 +309,7 @@ namespace Aurora::Loop
if (source->GetType() == ELoopSource::eSourceWin32)
{
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->msgCallbacks_, subscriber);
return AuTryInsert(this->msgCallbacks_.base, subscriber);
}
AuUInt32 count {};
@ -281,7 +321,7 @@ namespace Aurora::Loop
}
AU_LOCK_GUARD(sourceEx.lock);
if (!AuTryInsert(sourceEx.specificSubscribers, subscriber))
if (!AuTryInsert(sourceEx.callbacks.base, subscriber))
{
return {};
}
@ -289,6 +329,22 @@ namespace Aurora::Loop
count++;
}
if (!count)
{
AU_LOCK_GUARD(this->sourceMutex_);
for (auto &[innerSource, timeout, callbacks] : this->addedSources_)
{
if (innerSource == source)
{
if (!AuTryInsert(callbacks.base, subscriber))
{
return {};
}
count++;
}
}
}
return count;
}
@ -310,7 +366,8 @@ namespace Aurora::Loop
if (source->GetType() == ELoopSource::eSourceWin32)
{
return false;
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->msgCallbacks_.extended, subscriber);
}
AuUInt32 count {};
@ -322,7 +379,7 @@ namespace Aurora::Loop
}
AU_LOCK_GUARD(sourceEx.lock);
if (!AuTryInsert(sourceEx.timeoutSubscribers, subscriber))
if (!AuTryInsert(sourceEx.callbacks.extended, subscriber))
{
return {};
}
@ -330,6 +387,22 @@ namespace Aurora::Loop
count++;
}
if (!count)
{
AU_LOCK_GUARD(this->sourceMutex_);
for (auto &[innerSource, timeout, callbacks] : this->addedSources_)
{
if (innerSource == source)
{
if (!AuTryInsert(callbacks.extended, subscriber))
{
return {};
}
count++;
}
}
}
return count;
}
@ -343,15 +416,10 @@ namespace Aurora::Loop
return {};
}
// yes, this seems insane; however, the spinlock is basically free.
// disturbing the rwMutex_ would mean spuriously waking up all the Wait[...]Nb functions
// unlikely, but whats even less likely is for each sources spinlock to be held.
// this should be a work init-bound function, not wait or commit, so the vector abuse doesnt matter so much
for (auto &sourceEx : this->loopSourceExs_)
{
AU_LOCK_GUARD(sourceEx.lock); // hack lock, but its a spin zoomer
if (!AuTryInsert(sourceEx.globalSubscribers, subscriber))
if (!AuTryInsert(sourceEx.callbacks.base, subscriber))
{
return {};
}
@ -359,28 +427,32 @@ namespace Aurora::Loop
{
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->msgCallbacks_, subscriber);
for (auto &[innerSource, timeout, callbacks] : this->addedSources_)
{
if (!AuTryInsert(callbacks.base, subscriber))
{
return {};
}
}
}
{
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->msgCallbacks_.base, subscriber);
}
return true;
}
bool LoopQueue::HasFinished()
{
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
AuUInt32 askers {};
return WaitAnyNBSpurious(INFINITE, askers, nullptr, true) == this->sources_.size();
}
bool LoopQueue::IsSignaled()
{
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
AuUInt32 askers {};
return WaitAnyNBSpurious(0, askers, nullptr, true);
}
bool LoopQueue::WaitAll(AuUInt32 timeout)
{
// TODO:
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
bool bReturnStatus {true};
@ -388,16 +460,21 @@ namespace Aurora::Loop
AuUInt32 count {};
AuUInt32 index {};
if (this->handleArray_.empty())
if (this->handleArrayAnd_.empty())
{
return true;
}
count = this->handleArray_.size();
count = this->handleArrayAnd_.size();
AuUInt32 startTime = AuTime::CurrentInternalClockMS();
AuUInt32 endTime = startTime + timeout;
for (const auto &source : this->loopSourceExs_)
{
source.source->OnPresleep();
}
bool active = this->hEvent_ == INVALID_HANDLE_VALUE;
while (count != index)
@ -408,44 +485,32 @@ namespace Aurora::Loop
if (endTime <= startTime)
{
StartUserAndTakeOwn();
ConsiderEvicitingTimeoutsAll();
StartUserAndTakeOwn_Release();
return false;
}
auto timeDelta = endTime - startTime; // TODO: cap to last obj
DWORD status {};
// TODO: queue apc
if (this->bIsWinLoop_)
{
status = ::MsgWaitForMultipleObjectsEx(next, this->handleArray_.data() + index, timeDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE | MWMO_WAITALL);
status = ::MsgWaitForMultipleObjectsEx(next, this->handleArrayAnd_.data() + index, timeDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE | MWMO_WAITALL);
}
else
{
status = ::WaitForMultipleObjectsEx(next, this->handleArray_.data() + index, true, timeDelta, true);
status = ::WaitForMultipleObjectsEx(next, this->handleArrayAnd_.data() + index, true, timeDelta, true);
}
if (WaitStatusFromAligned(status, active))
if (WaitToRetStatus(status))
{
index += count;
// continue the sleep after succesfully passing MAXIMUM_WAIT_OBJECTS or [..., EOS] objects
continue;
}
// Account for updates
if (status == WAIT_OBJECT_0 && !active)
{
count = this->handleArray_.size();
if (count == 0)
{
return true;
}
else if (index > count)
{
index = 0;
}
continue;
}
// Account for win32 failures
if (status == WAIT_FAILED)
{
@ -456,7 +521,10 @@ namespace Aurora::Loop
// Ingore the other unrelated errors (APC notification, timeout, etc)
}
StartUserAndTakeOwn();
ConsiderEvicitingTimeoutsAll();
StartUserAndTakeOwn_Release();
return bReturnStatus;
}
@ -506,7 +574,7 @@ namespace Aurora::Loop
AuUInt32 count {};
AuUInt32 &index {chuggerIndex};
count = this->handleArray_.size();
count = this->handleArrayOr_.size();
indexOfTriggered = -1;
if (index > count) index = 0;
@ -524,16 +592,16 @@ namespace Aurora::Loop
DWORD status {};
if (this->bIsWinLoop_)
{
status = ::MsgWaitForMultipleObjectsEx(next, this->handleArray_.data() + index, this->slowTickMs_, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE);
status = ::MsgWaitForMultipleObjectsEx(next, this->handleArrayOr_.data() + index, this->slowTickMs_, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE);
if (status == next)
{
return this->handleArray_.size();
return this->handleArrayOr_.size();
}
}
else
{
status = ::WaitForMultipleObjectsEx(next, this->handleArray_.data() + index, false, this->slowTickMs_, true);
status = ::WaitForMultipleObjectsEx(next, this->handleArrayOr_.data() + index, false, this->slowTickMs_, true);
}
if (WaitStatusFromAligned(status, active))
@ -570,7 +638,7 @@ namespace Aurora::Loop
if (trigger)
{
trigger->reserve(this->sources_.size());
trigger->reserve(this->loopSourceExs_.size());
}
for (const auto &source : this->loopSourceExs_)
@ -578,7 +646,7 @@ namespace Aurora::Loop
source.source->OnPresleep();
}
if ((this->handleArray_.size() > MAXIMUM_WAIT_OBJECTS) || (this->forceChug_))
if ((this->handleArrayOr_.size() > MAXIMUM_WAIT_OBJECTS) || (this->forceChug_))
{
status = ChugWaitAny(timeout, chuggerIndex, indexOfTriggered);
}
@ -594,11 +662,11 @@ namespace Aurora::Loop
if (this->bIsWinLoop_)
{
temp = ::MsgWaitForMultipleObjectsEx(this->handleArray_.size(), this->handleArray_.data(), sleepDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE);
temp = ::MsgWaitForMultipleObjectsEx(this->handleArrayOr_.size(), this->handleArrayOr_.data(), sleepDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE);
}
else
{
temp = ::WaitForMultipleObjectsEx(this->handleArray_.size(), this->handleArray_.data(), false, sleepDelta, true);
temp = ::WaitForMultipleObjectsEx(this->handleArrayOr_.size(), this->handleArrayOr_.data(), false, sleepDelta, true);
}
status = WaitToRetStatus(temp);
@ -610,152 +678,155 @@ namespace Aurora::Loop
}
bool isPump = this->handleArray_.size() == indexOfTriggered;
bool isPump = this->handleArrayOr_.size() == indexOfTriggered;
StartUserAndTakeOwn();
AuUInt firstTriggered {};
if (status)
{
if (!isPump)
{
firstTriggered = reinterpret_cast<AuUInt>(this->handleArray_[indexOfTriggered]);
firstTriggered = reinterpret_cast<AuUInt>(this->handleArrayOr_[indexOfTriggered]);
}
AuUInt handleIndex {};
for (auto itr = this->loopSourceExs_.begin(); this->loopSourceExs_.end() != itr; )
{
auto &source = *itr;
AuUInt lastHandle {};
bool wasTriggered {};
auto handles = source.source->GetHandles();
if ((handleIndex % 64) == 0)
Iterator queueIterator(this);
AuSInt indexOffset {};
for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; )
{
handleIndex++;
}
bool shouldRemove {false};
auto &source = *queueIterator.itr;
auto lsStartIndex = handleIndex;
AuUInt lastHandle {};
bool wasTriggered {};
for (const auto &handle : handles)
{
handleIndex++;
if ((firstTriggered == handle) ||
(::WaitForSingleObject(reinterpret_cast<HANDLE>(handle), 0) == WAIT_OBJECT_0))
if (source.source)
{
lastHandle = handle;
wasTriggered = true;
break;
}
}
bool singular = source.source->Singular();
bool shouldRemove {false};
if (wasTriggered && source.source->OnTrigger(lastHandle))
{
triggeredCount++;
shouldRemove = true;
AU_LOCK_GUARD(const_cast<AuThreadPrimitives::SpinLock *>(&source.lock)) // this spinlock really does feel like a hack
for (const auto &handler : source.specificSubscribers)
if (!singular)
{
try
for (const auto &handle : source.source->GetHandles())
{
shouldRemove &= handler->OnFinished(source.source);
if ((firstTriggered == handle) ||
(::WaitForSingleObject(reinterpret_cast<HANDLE>(handle), 0) == WAIT_OBJECT_0))
{
lastHandle = handle;
wasTriggered = true;
break;
}
}
catch (...)
}
else
{
auto handle = source.source->GetHandle();
if ((firstTriggered == handle) ||
(::WaitForSingleObject(reinterpret_cast<HANDLE>(handle), 0) == WAIT_OBJECT_0))
{
SysPushErrorCatch();
lastHandle = firstTriggered;
wasTriggered = true;
}
}
if (shouldRemove)
{
for (const auto &handler : source.timeoutSubscribers)
if (wasTriggered && source.source->OnTrigger(lastHandle))
{
try
triggeredCount++;
shouldRemove = true;
AU_LOCK_GUARD(const_cast<AuThreadPrimitives::SpinLock *>(&source.lock)) // this spinlock really does feel like a hack
for (const auto &handler : source.callbacks.base)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
}
if (shouldRemove)
{
shouldRemove &= handler->OnFinished(source.source);
for (const auto &handler : source.callbacks.extended)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
catch (...)
if ((source.callbacks.base.empty()) &&
(source.callbacks.extended.empty()))
{
SysPushErrorCatch();
shouldRemove = false;
}
}
}
if (shouldRemove)
{
for (const auto &handler : source.globalSubscribers)
if (trigger)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
AuTryInsert(*trigger, source.source);
}
}
if ((source.specificSubscribers.empty()) &&
(source.globalSubscribers.empty()) &&
(source.timeoutSubscribers.empty()))
if (source.ConsiderTimeout())
{
shouldRemove = false;
}
}
if (shouldRemove)
{
if (trigger)
{
AuTryInsert(*trigger, source.source);
}
}
if (source.ConsiderTimeout())
{
shouldRemove = true;
}
source.source->OnFinishSleep();
if (shouldRemove)
{
// Evict from OS cache
AuRemoveRange(this->handleArray_, lsStartIndex, handles.size());
{
AU_LOCK_GUARD(this->sourceMutex_);
AuTryRemoveByTupleN<0>(this->sources_, source.sourceBase);
}
if (source.source->GetType() == ELoopSource::eSourceWin32)
{
this->msgSource_.reset();
this->msgCallbacks_ = {};
this->bIsWinLoop_ = false;
shouldRemove = true;
}
itr = this->loopSourceExs_.erase(itr);
handleIndex -= handles.size();
}
else
{
itr++;
}
source.source->OnFinishSleep();
if (shouldRemove)
{
if (source.source->GetType() == ELoopSource::eSourceWin32)
{
SysPanic("?");
}
else
{
AuUInt uHandles = (source.source->Singular() ? 1 : source.source->GetHandles().size());
int sOffset = -(uHandles);
if (this->handleArrayOr_.size() > MAXIMUM_WAIT_OBJECTS)
{
this->RemoveSourceNB(source.source);
this->willCommitInFuture_ = true;
}
else
{
AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr + indexOffset, uHandles);
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd + indexOffset, uHandles);
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
continue;
}
}
}
//else
{
queueIterator.Next();
}
}
}
}
else
{
ConsiderEvicitingTimeoutsAll();
}
StartUserAndTakeOwn_Release();
if (isPump)
{
triggeredCount++;
@ -770,7 +841,7 @@ namespace Aurora::Loop
}
bool shouldRemove {true};
for (const auto &handler : this->msgCallbacks_)
for (const auto &handler : this->msgCallbacks_.base)
{
try
{
@ -813,47 +884,76 @@ namespace Aurora::Loop
return triggeredCount;
}
void LoopQueue::StartUserAndTakeOwn()
{
this->rwMutex_->UpgradeReadToWrite(0);
this->isCommitableInFuture_ = true;
}
void LoopQueue::StartUserAndTakeOwn_Release()
{
this->isCommitableInFuture_ = false;
if (this->willCommitInFuture_)
{
this->CommitLocked();
}
this->rwMutex_->DowngradeWriteToRead();
}
// callee must own handle array
void LoopQueue::ConsiderEvicitingTimeoutsAll()
{
AuUInt32 startIndex {};
Iterator queueIterator(this);
bool bRebuildFromAnd {};
for (auto itr = this->loopSourceExs_.begin(); this->loopSourceExs_.end() != itr; )
for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; )
{
auto &source = *itr;
if ((startIndex % 64) == 0)
{
startIndex++;
}
auto count = source.source->GetHandles().size();
auto begin = startIndex;
startIndex += count;
auto &source = *queueIterator.itr;
if (!source.ConsiderTimeout())
{
itr++;
queueIterator.Next();
source.source->OnFinishSleep();
continue;
}
// Evict from OS cache
AuRemoveRange(this->handleArray_, startIndex, count);
{
AU_LOCK_GUARD(this->sourceMutex_);
AuTryRemoveByTupleN<0>(this->sources_, source.sourceBase);
}
source.source->OnFinishSleep();
if (source.source->GetType() == ELoopSource::eSourceWin32)
{
// Null message loop hack
this->msgSource_.reset();
this->msgCallbacks_ = {};
this->bIsWinLoop_ = false;
}
else
{
auto count = source.source->Singular() ? 1 : source.source->GetHandles().size();
itr = this->loopSourceExs_.erase(itr);
if (this->handleArrayOr_.size() <= 64)
{
AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr, count);
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count);
}
else
{
bRebuildFromAnd = true;
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count);
}
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
break;
}
queueIterator.Next();
}
if (bRebuildFromAnd)
{
BuildFromAndArray();
}
}

View File

@ -31,7 +31,6 @@ namespace Aurora::Loop
virtual bool Commit() override;
bool HasFinished() override;
bool IsSignaled() override;
bool WaitAll(AuUInt32 timeout) override;
@ -43,7 +42,16 @@ namespace Aurora::Loop
private:
void BuildFromAndArray();
void ConsiderEvicitingTimeoutsAll();
bool CommitLocked();
struct SourceCallbacks
{
AuList<AuSPtr<ILoopSourceSubscriber>> base;
AuList<AuSPtr<ILoopSourceSubscriberEx>> extended;
};
struct ExtendeSourceInfo
{
@ -52,16 +60,14 @@ namespace Aurora::Loop
AuSPtr<ILoopSourceEx> source;
AuSPtr<ILoopSource> sourceBase;
AuThreadPrimitives::SpinLock lock; // im too brain dead to think of a solution to the AddCallback thread safety issue, so i'm going to optimize Wait[...]s subscriber lookup with filtered ccaches in here, protected by this spinlock
AuList<AuSPtr<ILoopSourceSubscriber>> globalSubscribers;
AuList<AuSPtr<ILoopSourceSubscriber>> specificSubscribers;
AuList<AuSPtr<ILoopSourceSubscriberEx>> timeoutSubscribers;
AuUInt32 timeoutAbs;
SourceCallbacks callbacks;
bool ConsiderTimeout() const
{
if ((timeoutAbs) && (AuTime::CurrentInternalClockMS() >= timeoutAbs))
{
for (const auto &handler : timeoutSubscribers)
for (const auto &handler : callbacks.extended)
{
try
{
@ -80,29 +86,114 @@ namespace Aurora::Loop
}
};
bool IsValid();
void RemoveSourceNB(const AuSPtr<ILoopSource> &source, bool removeHandles);
bool RemoveSourceNB(const AuSPtr<ILoopSource> &source);
bool WaitAnyNBSpurious(AuUInt32 timeout, AuUInt32 &chuggerIndex, AuList<AuSPtr<ILoopSource>> *trigger, bool poll);
bool ChugWaitAny(AuUInt32 timeout, AuUInt32 &chuggerIndex, AuUInt32 &offset);
void StartUserAndTakeOwn();
void StartUserAndTakeOwn_Release();
//
AuList<AuSPtr<ILoopSourceSubscriber>> msgCallbacks_;
AuList<AuSPtr<ILoopSource>> removedSources_;
AuThreadPrimitives::SpinLock sourceMutex_;
AuList<AuTuple<AuSPtr<ILoopSource>, AuUInt32>> sources_;
SourceCallbacks msgCallbacks_;
AuList<AuSPtr<ILoopSource>> removedSources_;
AuThreadPrimitives::SpinLock sourceMutex_;
AuList<AuTuple<AuSPtr<ILoopSource>, AuUInt32, SourceCallbacks>> addedSources_;
//
// OS Cache
AuThreadPrimitives::RWLockUnique_t rwMutex_ = AuThreadPrimitives::RWLockUnique();
bool bIsWinLoop_ {};
bool bIsThreadSafe_ {};
AuList<HANDLE> handleArray_;
AuList<HANDLE> handleArrayOr_;
AuList<HANDLE> handleArrayAnd_;
AuUInt32 slowTickMs_ {1};
AuUInt32 slowTickRate_ {MAXIMUM_WAIT_OBJECTS};
bool forceChug_ {};
AuList<ExtendeSourceInfo> loopSourceExs_;
bool isCommitableInFuture_ {};
bool willCommitInFuture_ {};
AuSPtr<ILoopSource> msgSource_;
HANDLE hEvent_ {INVALID_HANDLE_VALUE};
AuList<ExtendeSourceInfo> loopSourceExs_;
bool bLargeCommitQueueDirty {};
struct Iterator
{
LoopQueue *base;
inline Iterator(LoopQueue *base) : base(base)
{
}
inline auto End()
{
return this->base->loopSourceExs_.end();
}
inline void Start()
{
this->startingIndexAnd = 0;
this->startingIndexOr = 1;
itr = this->base->loopSourceExs_.begin();
}
inline void Next()
{
Increment();
itr++;
}
template<class T>
inline void Delete(T &&itr)
{
this->itr = AuMove(itr);
}
inline void Increment()
{
if (itr != End())
{
if (itr->source->Singular())
{
startingIndexOr++;
startingIndexAnd++;
if ((startingIndexOr % 64) == 0)
{
startingIndexOr++;
}
}
else
{
// TODO: this is gross and slow
for (int i = 0; i < itr->source->GetHandles().size(); i++)
{
startingIndexOr += i;
startingIndexAnd += i;
if ((startingIndexOr % 64) == 0)
{
startingIndexOr++;
}
}
}
}
}
AuUInt startingIndexOr {};
AuUInt startingIndexAnd {};
AuToIterator_t<AuList<ExtendeSourceInfo>> itr;
};
friend struct Iteartor;
};
}

View File

@ -19,6 +19,7 @@ namespace Aurora::Loop
if (handles.size() == 1)
{
one = 0;
return WaitForSingleObjectEx(reinterpret_cast<HANDLE>(handles.at(0)), 0, true) == WAIT_OBJECT_0;
}
else
@ -29,10 +30,23 @@ namespace Aurora::Loop
{
ntHandles.push_back(reinterpret_cast<HANDLE>(handle));
}
return WaitForMultipleObjectsEx(ntHandles.size(), ntHandles.data(), false, 0, true) >= WAIT_OBJECT_0;
auto idx = WaitForMultipleObjectsEx(ntHandles.size(), ntHandles.data(), false, 0, true);
if (idx < WAIT_OBJECT_0)
{
return false;
}
one = handles[idx];
return true;
}
}
bool WaitSingleGeneric::WaitForOne(AuUInt handle)
{
return WaitForSingleObjectEx(reinterpret_cast<HANDLE>(handle), 0, true) == WAIT_OBJECT_0;
}
void WaitSingleGeneric::OnPresleep()
{

View File

@ -12,6 +12,7 @@ namespace Aurora::Loop
struct WaitSingleGeneric : public WaitSingleBase
{
virtual bool WaitForAtleastOne(const AuList<AuUInt> &handles, AuUInt &one) override;
virtual bool WaitForOne(AuUInt handle) override;
virtual void OnPresleep() override;
virtual void OnFinishSleep() override;
virtual ELoopSource GetType() override;

View File

@ -12,12 +12,29 @@ namespace Aurora::Loop
{
bool WaitSingleBase::IsSignaled()
{
bool val {};
AuUInt one {};
this->OnPresleep();
auto handles = this->GetHandles();
auto val = WaitForAtleastOne(handles, one);
if (val)
val = this->OnTrigger(one);
if (this->Singular())
{
auto handle = this->GetHandle();
auto val = this->WaitForOne(handle);
if (val)
{
val = this->OnTrigger(handle);
}
}
else
{
auto handles = this->GetHandles();
auto val = WaitForAtleastOne(handles, one);
if (val)
{
val = this->OnTrigger(one);
}
}
this->OnFinishSleep();
return val;
}

View File

@ -16,5 +16,6 @@ namespace Aurora::Loop
bool IsSignaled() override;
virtual bool WaitForAtleastOne(const AuList<AuUInt> &handles, AuUInt &one) = 0;
virtual bool WaitForOne(AuUInt handle) = 0;
};
}