AuroraRuntime/Source/IO/Loop/LoopQueue.NT.cpp

1365 lines
40 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: LoopQueue.NT.cpp
Date: 2022-1-8
Author: Reece
Note: Win32 sewage lmao
***/
#include <Source/RuntimeInternal.hpp>
#include "Loop.NT.hpp"
#include "ILoopSourceEx.hpp"
#include "LSWin32.NT.hpp"
#include "LoopQueue.NT.hpp"
static bool WaitToRetStatus(DWORD ret)
{
return !((ret == WAIT_TIMEOUT) ||
(ret == WAIT_IO_COMPLETION) ||
(ret == WAIT_FAILED));
}
static bool WaitStatusFromAligned(DWORD ret, bool active)
{
return (WaitToRetStatus(ret) && ((active) || (ret != WAIT_OBJECT_0 /*lock releaser*/)));
}
namespace Aurora::IO::Loop
{
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Loop Queue :: Extended Handle
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
LoopQueue::ExtendedSourceInfo::ExtendedSourceInfo(const AuSPtr<ILoopSourceEx> &in) : source(in), timeoutAbs(0)
{
}
bool LoopQueue::ExtendedSourceInfo::ConsiderTimeout(AuUInt64 time) const
{
if (!this->timeoutAbs)
{
return false;
}
if (time < this->timeoutAbs)
{
return false;
}
for (const auto &handler : callbacks.extended)
{
try
{
handler->OnTimeout(sourceBase);
}
catch (...)
{
SysPushErrorCatch();
}
}
return true;
}
AuPair<bool, bool> LoopQueue::ExtendedSourceInfo::DoWork(LoopQueue *queue, AuUInt handle)
{
bool bShouldRemove {true};
AuUInt8 uPosition {};
if (this->source && handle != -1)
{
if (!this->source->OnTrigger(handle))
{
return {};
}
}
bool bOverload {};
{
AU_LOCK_GUARD(&this->lock);
if ((this->callbacks.base.empty()) &&
(this->callbacks.extended.empty()))
{
bOverload = true;
}
// Notify callbacks...
for (auto itr = this->callbacks.extended.begin();
itr != this->callbacks.extended.end(); )
{
bool result;
auto handler = *itr;
try
{
result = handler->OnFinished(this->sourceBase, uPosition++);
}
catch (...)
{
SysPushErrorCatch();
}
bShouldRemove &= result;
if (result)
{
itr = this->callbacks.extended.erase(itr);
}
else
{
itr++;
}
}
for (auto itr = this->callbacks.base.begin();
itr != this->callbacks.base.end(); )
{
bool result;
auto handler = *itr;
try
{
result = handler->OnFinished(this->sourceBase);
}
catch (...)
{
SysPushErrorCatch();
}
bShouldRemove &= result;
if (result)
{
itr = this->callbacks.base.erase(itr);
}
else
{
itr++;
}
}
}
// Evict when subs count hit zero, not when sub count started off at zero
if (bOverload)
{
bShouldRemove = false;
}
// Notify global subscribers, allowing them to preempt removal
if (bShouldRemove || bOverload)
{
AU_LOCK_GUARD(queue->sourceMutex_);
for (const auto &handler : queue->globalSubcribers_)
{
try
{
bShouldRemove &= handler->OnFinished(this->sourceBase);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
return AuMakePair(true, bShouldRemove);
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Loop Queue
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
LoopQueue::LoopQueue()
{
this->hEvent_ = ::CreateEventW(NULL, true, false, NULL);
this->hDummy_ = ::CreateEventW(NULL, true, true, NULL);
}
LoopQueue::~LoopQueue()
{
AuWin32CloseHandle(this->hEvent_);
AuWin32CloseHandle(this->hDummy_);
}
bool LoopQueue::Init()
{
return true;
}
bool LoopQueue::IsValid()
{
return this->hEvent_ != INVALID_HANDLE_VALUE && this->handleArrayOr_.size() && this->handleArrayAnd_.size();
}
struct LoopJoinable
{
LoopQueue *that;
void Lock()
{
that->Sync();
}
bool TryLock()
{
return that->Sync2();
}
void Unlock()
{
that->Unlock();
}
};
void LoopQueue::Sync()
{
if (this->hEvent_ == INVALID_HANDLE_VALUE)
{
this->rwMutex_->AsWritable()->Lock();
return;
}
::SetEvent(this->hEvent_);
this->rwMutex_->AsWritable()->Lock();
::ResetEvent(this->hEvent_);
}
bool LoopQueue::Sync2()
{
if (this->hEvent_ != INVALID_HANDLE_VALUE)
{
::SetEvent(this->hEvent_);
}
if (!this->rwMutex_->AsWritable()->TryLock())
{
return false;
}
if (this->hEvent_ != INVALID_HANDLE_VALUE)
{
::ResetEvent(this->hEvent_);
}
return true;
}
void LoopQueue::Unlock()
{
this->rwMutex_->AsWritable()->Unlock();
}
bool LoopQueue::SourceAdd(const AuSPtr<ILoopSource> &source)
{
AU_LOCK_GUARD(this->sourceMutex_);
if (!AuTryInsert(this->addedSources_, AuMakeTuple(source, 0, SourceCallbacks {})))
{
return false;
}
return true;
}
bool LoopQueue::SourceAddWithTimeout(const AuSPtr<ILoopSource> &source, AuUInt32 maxTimeout)
{
AU_LOCK_GUARD(this->sourceMutex_);
if (!AuTryInsert(this->addedSources_, AuMakeTuple(source, AuTime::SteadyClockMS() + maxTimeout, SourceCallbacks {})))
{
return false;
}
return true;
}
bool LoopQueue::SourceRemove(const AuSPtr<ILoopSource> &source)
{
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->removedSources_, source) || AuTryRemoveByTupleN<0>(this->addedSources_, source);
}
bool LoopQueue::RemoveSourceNB(const AuSPtr<ILoopSource> &source)
{
if (source->GetType() == ELoopSource::eSourceWin32)
{
this->msgSource_.reset();
this->msgCallbacks_ = {};
this->bIsWinLoop_ = false;
return {};
}
bool bRebuildFromAnd {};
Iterator queueIterator(this);
for (queueIterator.Start();
queueIterator.End() != queueIterator.itr; )
{
if (queueIterator.itr->sourceBase == source)
{
if (queueIterator.itr->source)
{
auto count = queueIterator.itr->source->Singular() ? 1 : queueIterator.itr->source->GetHandles().size();
if (this->handleArrayOr_.size() <= MAXIMUM_WAIT_OBJECTS)
{
AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr, count);
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count);
}
else
{
bRebuildFromAnd = true;
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count);
}
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
break;
}
else
{
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
break;
}
}
queueIterator.Next();
}
return bRebuildFromAnd;
}
bool LoopQueue::Commit()
{
if (this->isCommitableInFuture_)
{
this->willCommitInFuture_ = true;
return true;
}
LoopJoinable joinable {this};
AU_TRY_LOCK_GUARD_RET_DEF(joinable);
return CommitLocked();
}
void LoopQueue::BuildFromAndArray()
{
this->handleArrayOr_.clear();
this->handleArrayOr_.reserve(this->handleArrayAnd_.size() + 16);
for (int i = 0; i < this->handleArrayAnd_.size(); i++)
{
if (((this->handleArrayOr_.size() % MAXIMUM_WAIT_OBJECTS)) == 0)
{
this->handleArrayOr_.push_back(this->hEvent_);
}
this->handleArrayOr_.push_back(this->handleArrayAnd_[i]);
}
}
bool LoopQueue::CommitLocked()
{
try
{
bool bShouldRebuild {};
for (const auto &re : this->removedSources_)
{
bShouldRebuild |= this->RemoveSourceNB(re);
}
// Reserve the cache arrays for initialization
this->loopSourceExs_.reserve(this->loopSourceExs_.size() + this->addedSources_.size());
this->handleArrayAnd_.reserve(this->loopSourceExs_.size() + this->addedSources_.size());
this->handleArrayOr_.reserve(this->loopSourceExs_.size() + this->addedSources_.size());
bShouldRebuild |= bool(this->addedSources_.size());
//
for (auto &[source, timeout, callbacks] : this->addedSources_)
{
// Filter bad sources
if (!source)
{
continue;
}
// Win32 edge case
if (source->GetType() == ELoopSource::eSourceWin32)
{
this->bIsWinLoop_ = true;
this->msgSource_ = source;
this->msgCallbacks_ = AuMove(callbacks);
continue;
}
// Handle known ILoopSourceEx handle objects
if (auto extended = AuDynamicCast<ILoopSourceEx>(source))
{
ExtendedSourceInfo t {extended};
t.timeoutAbs = timeout;
t.sourceBase = source;
t.source = extended;
t.callbacks = AuMove(callbacks);
this->loopSourceExs_.push_back(t);
if (extended->Singular())
{
const auto handle = extended->GetHandle();
auto nthandle = reinterpret_cast<HANDLE>(handle);
this->handleArrayAnd_.push_back(nthandle);
}
else
{
for (const auto &handle : extended->GetHandles())
{
auto nthandle = reinterpret_cast<HANDLE>(handle);
this->handleArrayAnd_.push_back(nthandle);
}
}
}
}
for (const auto &re : this->removedSources_)
{
bShouldRebuild |= this->RemoveSourceNB(re);
}
AuTryClear(this->addedSources_);
AuTryClear(this->removedSources_);
if (bShouldRebuild)
{
BuildFromAndArray();
}
return true;
}
catch (...)
{
return {};
}
}
AuUInt32 LoopQueue::GetSourceCount()
{
return this->loopSourceExs_.size();
}
void LoopQueue::ChugPathConfigure(AuUInt32 sectionTickTime, AuSInt sectionDequeCount)
{
this->slowTickRate_ = AuMin(sectionDequeCount, AuSInt(MAXIMUM_WAIT_OBJECTS));
this->slowTickMs_ = sectionTickTime ? sectionTickTime : 1;
}
void LoopQueue::ChugHint(bool value)
{
this->forceChug_ = value;
}
bool LoopQueue::AddCallback(const AuSPtr<ILoopSource> &source, const AuSPtr<ILoopSourceSubscriber> &subscriber)
{
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
if (!source)
{
SysPushErrorArg();
return {};
}
if (!subscriber)
{
SysPushErrorArg();
return {};
}
AuUInt32 count {};
for (auto &sourceEx : this->loopSourceExs_)
{
if (sourceEx.sourceBase != source)
{
continue;
}
AU_LOCK_GUARD(sourceEx.lock);
if (!AuTryInsert(sourceEx.callbacks.base, subscriber))
{
return {};
}
count++;
}
if (!count)
{
AU_LOCK_GUARD(this->sourceMutex_);
for (auto &[innerSource, timeout, callbacks] : this->addedSources_)
{
if (innerSource == source)
{
if (!AuTryInsert(callbacks.base, subscriber))
{
return {};
}
count++;
}
}
}
if (source->GetType() == ELoopSource::eSourceWin32)
{
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->msgCallbacks_.base, subscriber);
}
return count;
}
bool LoopQueue::AddCallbackEx(const AuSPtr<ILoopSource> &source, const AuSPtr<ILoopSourceSubscriberEx> &subscriber)
{
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
if (!source)
{
SysPushErrorArg();
return {};
}
if (!subscriber)
{
SysPushErrorArg();
return {};
}
AuUInt32 count {};
for (auto &sourceEx : this->loopSourceExs_)
{
if (sourceEx.sourceBase != source)
{
continue;
}
// IMPORTANT: Soft-locks on this line, whose stack traces contain ExtendedSourceInfo::DoWork, are
// a re-add during io-tick before removal condition.
AU_LOCK_GUARD(sourceEx.lock);
if (!AuTryInsert(sourceEx.callbacks.extended, subscriber))
{
return {};
}
count++;
}
if (!count)
{
AU_LOCK_GUARD(this->sourceMutex_);
for (auto &[innerSource, timeout, callbacks] : this->addedSources_)
{
if (innerSource == source)
{
if (!AuTryInsert(callbacks.extended, subscriber))
{
return {};
}
count++;
}
}
}
if (source->GetType() == ELoopSource::eSourceWin32)
{
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->msgCallbacks_.extended, subscriber);
}
return count;
}
bool LoopQueue::AddCallback(const AuSPtr<ILoopSourceSubscriber> &subscriber)
{
AU_LOCK_GUARD(rwMutex_->AsReadable());
if (!subscriber)
{
SysPushErrorArg();
return {};
}
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->globalSubcribers_, subscriber);
}
bool LoopQueue::IsSignaledPeek()
{
AuUInt32 askers {};
bool bAskers {};
return WaitAnyNBSpurious(0, askers, nullptr, true, bAskers);
}
bool LoopQueue::WaitAll(AuUInt32 timeout)
{
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
bool bReturnStatus {false};
bool bTimeout {false};
AuUInt32 count {};
AuUInt32 index {};
if (this->handleArrayAnd_.empty())
{
return true;
}
count = this->handleArrayAnd_.size();
AuUInt64 startTime = AuTime::SteadyClockNS();
AuUInt64 endTime = startTime + AuMSToNS<AuUInt64>(timeout);
for (const auto &source : this->loopSourceExs_)
{
source.source->OnPresleep();
}
while (count != index)
{
auto next = AuMin(count - index, AuUInt32(MAXIMUM_WAIT_OBJECTS));
startTime = AuTime::SteadyClockNS();
if (timeout)
{
if (endTime <= startTime)
{
StartUserAndTakeOwn();
ConsiderEvicitingTimeoutsAll();
StartUserAndTakeOwn_Release();
return false;
}
}
auto timeDelta = AuNSToMS<AuUInt32>(endTime - startTime); // TODO: cap to last obj
DWORD status {};
do
{
if (this->bIsWinLoop_)
{
status = ::MsgWaitForMultipleObjectsEx(next, this->handleArrayAnd_.data() + index, timeDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE | MWMO_WAITALL);
}
else
{
status = ::WaitForMultipleObjectsEx(next, this->handleArrayAnd_.data() + index, true, timeDelta, true);
}
} while (status == WAIT_IO_COMPLETION);
if (status == WAIT_OBJECT_0 + next)
{
TryPumpWin32();
continue;
}
if (WaitToRetStatus(status))
{
index += (status + 1);
continue;
}
// Account for win32 failures
if (status == WAIT_FAILED)
{
bReturnStatus = false;
break;
}
// Ingore the other unrelated errors (APC notification, timeout, etc)
}
// OK - All signals are set
// Take ownership of the queue ready for a potential purge of objects
StartUserAndTakeOwn();
// Le great iterate
Iterator queueIterator(this);
AuSInt indexOffset {};
auto now = AuTime::SteadyClockMS();
for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; )
{
auto &source = *queueIterator.itr;
auto [ticked, bShouldRemove] = source.DoWork(this, source.source->Singular() ? source.source->GetHandle() : source.source->GetHandles()[0]);
if (!bShouldRemove && source.ConsiderTimeout(now))
{
bShouldRemove = true;
}
source.source->OnFinishSleep();
if (ticked)
{
bReturnStatus |= true;
}
if (this->handleArrayOr_[queueIterator.startingIndexOr] == AuReinterpretCast<HANDLE>(this->hDummy_))
{
auto handle = AuReinterpretCast<HANDLE>(source.source->Singular() ? source.source->GetHandle() : source.source->GetHandles()[0]);
this->handleArrayAnd_[queueIterator.startingIndexAnd] = handle;
this->handleArrayOr_[queueIterator.startingIndexOr] = handle;
}
if (bShouldRemove)
{
if (source.source->GetType() == ELoopSource::eSourceWin32)
{
SysPanic("?");
}
else
{
AuUInt uHandles = (source.source->Singular() ? 1 : source.source->GetHandles().size());
int sOffset = -((int)uHandles);
if (this->handleArrayOr_.size() > MAXIMUM_WAIT_OBJECTS)
{
this->RemoveSourceNB(source.source);
this->willCommitInFuture_ = true;
}
else
{
AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr + indexOffset, uHandles);
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd + indexOffset, uHandles);
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
continue;
}
}
}
//else
{
queueIterator.Next();
}
}
StartUserAndTakeOwn_Release();
return bReturnStatus;
}
AuUInt32 LoopQueue::WaitAny(AuUInt32 timeout)
{
AuUInt32 ret {};
bool lastItr {};
AuUInt64 startTime = AuTime::SteadyClockNS();
AuUInt64 endTime = timeout ? (startTime + timeout) : AuUInt64(-1);
AuUInt32 chuggerIndex {};
bool bTriggerWin32 {};
do
{
if (WaitAnyNBSpurious(endTime, chuggerIndex, nullptr, false, bTriggerWin32))
{
ret++;
}
PumpHooks();
}
while (!bTriggerWin32 &&
WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0);
return ret;
}
AuList<AuSPtr<ILoopSource>> LoopQueue::WaitAnyEx(AuUInt32 timeout)
{
AuList<AuSPtr<ILoopSource>> trigger;
AuUInt64 startTime = AuTime::SteadyClockNS();
AuUInt64 endTime = timeout ? (startTime + timeout) : AuUInt64(-1);
AuUInt32 chuggerIndex {};
bool bTriggerWin32 {};
do
{
if (WaitAnyNBSpurious(endTime, chuggerIndex, &trigger, false, bTriggerWin32))
{
return trigger;
}
PumpHooks();
}
while (!bTriggerWin32 &&
WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0);
return trigger;
}
AuUInt32 LoopQueue::PumpNonblocking()
{
AuUInt32 ret {};
bool lastItr {};
bool bTriggerWin32 {};
AuUInt32 chuggerIndex {};
do
{
if (WaitAnyNBSpurious(0, chuggerIndex, nullptr, false, bTriggerWin32))
{
ret++;
}
PumpHooks();
}
while (!bTriggerWin32 &&
WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0);
return ret;
}
AuList<AuSPtr<ILoopSource>> LoopQueue::PumpNonblockingEx()
{
AuList<AuSPtr<ILoopSource>> trigger;
AuUInt32 chuggerIndex {};
bool bTriggerWin32 {};
do
{
if (WaitAnyNBSpurious(0, chuggerIndex, &trigger, false, bTriggerWin32))
{
return trigger;
}
PumpHooks();
}
while (!bTriggerWin32 &&
WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0);
return trigger;
}
bool LoopQueue::ChugWaitAny(AuUInt64 internalEndTime, AuUInt32 &chuggerIndex, AuUInt32 &indexOfTriggered)
{
AuUInt32 count {};
AuUInt32 &index {chuggerIndex};
count = this->handleArrayOr_.size();
indexOfTriggered = -1;
if (index > count) index = 0;
bool active = this->hEvent_ == INVALID_HANDLE_VALUE;
while (count != index)
{
auto next = AuMin(count - index, AuUInt32(MAXIMUM_WAIT_OBJECTS));
auto sleepMS = this->slowTickMs_;
if (internalEndTime && internalEndTime != AuUInt64(-1))
{
auto now = AuTime::SteadyClockMS();
auto delta = AuInt64(internalEndTime) - AuInt64(now);
if (delta <= 0)
{
return false;
}
sleepMS = AuMin(sleepMS, AuUInt32(delta));
}
DWORD status {};
if (this->bIsWinLoop_)
{
do
{
status = ::MsgWaitForMultipleObjectsEx(next, this->handleArrayOr_.data() + index, sleepMS, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE);
}
while (status == WAIT_IO_COMPLETION);
if (status == next)
{
indexOfTriggered = this->handleArrayOr_.size();
return true;
}
}
else
{
do
{
status = ::WaitForMultipleObjectsEx(next, this->handleArrayOr_.data() + index, false, sleepMS, true);
}
while (status == WAIT_IO_COMPLETION);
}
if (WaitStatusFromAligned(status, active))
{
DWORD offset = status - WAIT_OBJECT_0;
indexOfTriggered = offset + index;
return true;
}
if (status == WAIT_OBJECT_0 && !active)
{
return false;
}
if (status == WAIT_FAILED)
{
return false;
}
index += count;
}
return false;
}
bool LoopQueue::WaitAnyNBSpurious(AuUInt64 internalEndTime, AuUInt32 &chuggerIndex, AuList<AuSPtr<ILoopSource>> *trigger, bool poll, bool &bTriggerWin32)
{
bool status {};
DWORD temp;
AuUInt32 indexOfTriggered {};
AuUInt32 triggeredCount {};
AU_LOCK_GUARD(this->rwMutex_->AsReadable()); // the spurious wake up comes from an event that tells all to release me
if (trigger)
{
trigger->reserve(this->loopSourceExs_.size());
}
for (const auto &source : this->loopSourceExs_)
{
source.source->OnPresleep();
}
if ((this->handleArrayOr_.size() > MAXIMUM_WAIT_OBJECTS) || (this->forceChug_))
{
status = ChugWaitAny(internalEndTime, chuggerIndex, indexOfTriggered);
}
else
{
AuUInt32 sleepDelta {};
if (internalEndTime)
{
if (internalEndTime == AuUInt64(-1))
{
sleepDelta = INFINITE;
}
else
{
auto now = AuTime::SteadyClockMS();
if (internalEndTime <= now)
{
return false;
}
sleepDelta = internalEndTime - now;
}
}
else
{
sleepDelta = 0;
}
do
{
if (this->bIsWinLoop_)
{
temp = ::MsgWaitForMultipleObjectsEx(this->handleArrayOr_.size(), this->handleArrayOr_.data(), sleepDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE);
}
else
{
temp = ::WaitForMultipleObjectsEx(this->handleArrayOr_.size(), this->handleArrayOr_.data(), false, sleepDelta, true);
}
} while (temp == WAIT_IO_COMPLETION);
status = WaitToRetStatus(temp);
if (status)
{
indexOfTriggered = temp - WAIT_OBJECT_0;
}
}
if (indexOfTriggered == this->handleArrayOr_.size())
{
TryPumpWin32();
triggeredCount++;
}
else
{
if (TryPumpWin32())
{
triggeredCount++;
}
}
bool isPump = this->handleArrayOr_.size() == indexOfTriggered;
StartUserAndTakeOwn();
AuUInt firstTriggered {};
if (status)
{
if (!isPump)
{
firstTriggered = reinterpret_cast<AuUInt>(this->handleArrayOr_[indexOfTriggered]);
}
{
Iterator queueIterator(this);
AuSInt indexOffset {};
auto now = AuTime::SteadyClockMS();
for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; )
{
bool shouldRemove {false};
auto &source = *queueIterator.itr;
AuUInt lastHandle {};
bool wasTriggered {};
if (source.source)
{
bool singular = source.source->Singular();
bool bPollTriggered = this->handleArrayOr_[queueIterator.startingIndexOr] == this->hDummy_;
if (!bPollTriggered)
{
if (!singular)
{
for (const auto &handle : source.source->GetHandles())
{
if ((firstTriggered == handle) ||
(::WaitForSingleObject(reinterpret_cast<HANDLE>(handle), 0) == WAIT_OBJECT_0))
{
lastHandle = handle;
wasTriggered = true;
break;
}
}
}
else
{
auto handle = source.source->GetHandle();
if ((firstTriggered == handle) ||
(::WaitForSingleObject(reinterpret_cast<HANDLE>(handle), 0) == WAIT_OBJECT_0))
{
lastHandle = firstTriggered;
wasTriggered = true;
}
}
}
if (bPollTriggered || (wasTriggered && source.source->OnTrigger(lastHandle)))
{
bool failing {};
if (!lastHandle)
{
lastHandle = (AuUInt)this->handleArrayAnd_[queueIterator.startingIndexAnd];
}
if (poll)
{
if (!bPollTriggered)
{
failing = !AuTryInsert(this->handleArrayTainted_, AuMakeTuple(queueIterator.startingIndexAnd, 1, reinterpret_cast<HANDLE>(lastHandle)));
if (!failing)
{
triggeredCount++;
this->handleArrayAnd_[queueIterator.startingIndexAnd ] = reinterpret_cast<HANDLE>(this->hDummy_);
this->handleArrayOr_[queueIterator.startingIndexOr ] = reinterpret_cast<HANDLE>(this->hDummy_);
}
}
else
{
triggeredCount++;
}
}
if (!poll)
{
if (this->handleArrayTainted_.size() &&
bPollTriggered)
{
AuTuple<int, int, HANDLE> ref;
bool matched = AuRemoveIf(this->handleArrayTainted_, [&](const AuTuple<int, int, HANDLE> &in) -> bool
{
bool bMatched = queueIterator.startingIndexAnd == AuGet<0>(in);
if (!bMatched)
{
return false;
}
ref = in;
return true;
});
if (matched)
{
if (AuGet<1>(ref) == 1)
{
auto handle = AuGet<2>(ref);
this->handleArrayOr_[queueIterator.startingIndexOr] = handle;
this->handleArrayAnd_[queueIterator.startingIndexAnd] = handle;
}
}
}
}
if (!poll || failing)
{
triggeredCount++;
shouldRemove = true;
auto [ticked, bShouldRemove] = source.DoWork(this, -1);
shouldRemove = bShouldRemove;
if (trigger)
{
AuTryInsert(*trigger, source.source);
}
}
}
}
source.source->OnFinishSleep();
if (shouldRemove)
{
if (source.source->GetType() == ELoopSource::eSourceWin32)
{
SysPanic("?");
}
else
{
AuUInt uHandles = (source.source->Singular() ? 1 : source.source->GetHandles().size());
int sOffset = -(uHandles);
if (this->handleArrayOr_.size() > MAXIMUM_WAIT_OBJECTS)
{
this->RemoveSourceNB(source.source);
this->willCommitInFuture_ = true;
}
else
{
AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr + indexOffset, uHandles);
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd + indexOffset, uHandles);
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
continue;
}
}
}
//else
{
queueIterator.Next();
}
}
}
}
else
{
ConsiderEvicitingTimeoutsAll();
}
StartUserAndTakeOwn_Release();
return triggeredCount;
}
void LoopQueue::StartUserAndTakeOwn()
{
this->rwMutex_->UpgradeReadToWrite(0);
this->isCommitableInFuture_ = true;
}
bool LoopQueue::TryPumpWin32()
{
bool bMsgPump {};
if (!this->msgSource_)
{
return false;
}
if (AuStaticCast<Win32Dummy>(this->msgSource_)->bIsPumping_)
{
MSG msg;
try
{
while (PeekMessageW(&msg, NULL, 0, 0, PM_REMOVE))
{
TranslateMessage(&msg);
DispatchMessageW(&msg);
bMsgPump = true;
}
}
catch (...)
{
SysPushErrorCatch("Win32 Pump <-> Aur LoopQueue. Window handler threw a C++ exception.");
}
}
else
{
MSG msg;
if (PeekMessageW(&msg, NULL, 0, 0, PM_NOREMOVE))
{
bMsgPump = true;
}
}
// Notify all and remove if unwanted
if (bMsgPump)
{
AU_LOCK_GUARD(this->sourceMutex_);
bool bShouldRemove = true;
for (const auto &handler : this->msgCallbacks_.base)
{
try
{
bShouldRemove &= handler->OnFinished(this->msgSource_);
}
catch (...)
{
SysPushErrorCatch();
}
}
AuUInt i {};
for (const auto &handler : this->msgCallbacks_.extended)
{
try
{
bShouldRemove &= handler->OnFinished(this->msgSource_, i++);
}
catch (...)
{
SysPushErrorCatch();
}
}
for (const auto &handler : this->globalSubcribers_)
{
try
{
bShouldRemove &= handler->OnFinished(this->msgSource_);
}
catch (...)
{
SysPushErrorCatch();
}
}
if (bShouldRemove)
{
this->bIsWinLoop_ = false;
}
}
return bMsgPump;
}
void LoopQueue::StartUserAndTakeOwn_Release()
{
this->isCommitableInFuture_ = false;
if (this->willCommitInFuture_)
{
this->CommitLocked();
}
this->rwMutex_->DowngradeWriteToRead();
}
// callee must own handle array
void LoopQueue::ConsiderEvicitingTimeoutsAll()
{
Iterator queueIterator(this);
bool bRebuildFromAnd {};
auto now = AuTime::SteadyClockMS();
for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; )
{
auto &source = *queueIterator.itr;
if (!source.ConsiderTimeout(now))
{
queueIterator.Next();
source.source->OnFinishSleep();
continue;
}
source.source->OnFinishSleep();
if (source.source->GetType() == ELoopSource::eSourceWin32)
{
// Null message loop hack
this->msgSource_.reset();
this->msgCallbacks_ = {};
this->bIsWinLoop_ = false;
}
else
{
auto count = source.source->Singular() ? 1 : source.source->GetHandles().size();
if (this->handleArrayOr_.size() <= MAXIMUM_WAIT_OBJECTS)
{
AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr, count);
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count);
}
else
{
bRebuildFromAnd = true;
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count);
}
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
break;
}
queueIterator.Next();
}
if (bRebuildFromAnd)
{
BuildFromAndArray();
}
}
bool LoopQueue::AddHook(const AuFunction<void()> &func)
{
return AuTryInsert(this->epilogueHooks_, func);
}
void LoopQueue::PumpHooks()
{
auto c = AuExchange(this->epilogueHooks_, {});
for (auto &a : c)
{
a();
}
}
AUKN_SYM AuSPtr<ILoopQueue> NewLoopQueue()
{
return AuMakeShared<LoopQueue>();
}
}