/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: LoopQueue.NT.cpp Date: 2022-1-8 Author: Reece Note: Win32 sewage lmao ***/ #include #include "Loop.NT.hpp" #include "ILoopSourceEx.hpp" #include "LSWin32.NT.hpp" #include "LoopQueue.NT.hpp" static bool WaitToRetStatus(DWORD ret) { return !((ret == WAIT_TIMEOUT) || (ret == WAIT_IO_COMPLETION) || (ret == WAIT_FAILED)); } static bool WaitStatusFromAligned(DWORD ret, bool active) { return (WaitToRetStatus(ret) && ((active) || (ret != WAIT_OBJECT_0 /*lock releaser*/))); } namespace Aurora::IO::Loop { ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Loop Queue :: Extended Handle ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// LoopQueue::ExtendedSourceInfo::ExtendedSourceInfo(const AuSPtr &in) : source(in), timeoutAbs(0) { } bool LoopQueue::ExtendedSourceInfo::ConsiderTimeout(AuUInt64 time) const { if (!this->timeoutAbs) { return false; } if (time < this->timeoutAbs) { return false; } for (const auto &handler : callbacks.extended) { try { handler->OnTimeout(sourceBase); } catch (...) { SysPushErrorCatch(); } } return true; } AuPair LoopQueue::ExtendedSourceInfo::DoWork(LoopQueue *queue, AuUInt handle) { bool bShouldRemove {true}; AuUInt8 uPosition {}; if (this->source && handle != -1) { if (!this->source->OnTrigger(handle)) { return {}; } } bool bOverload {}; { AU_LOCK_GUARD(&this->lock); if ((this->callbacks.base.empty()) && (this->callbacks.extended.empty())) { bOverload = true; } // Notify callbacks... for (auto itr = this->callbacks.extended.begin(); itr != this->callbacks.extended.end(); ) { bool result; auto handler = *itr; try { result = handler->OnFinished(this->sourceBase, uPosition++); } catch (...) { SysPushErrorCatch(); } bShouldRemove &= result; if (result) { itr = this->callbacks.extended.erase(itr); } else { itr++; } } for (auto itr = this->callbacks.base.begin(); itr != this->callbacks.base.end(); ) { bool result; auto handler = *itr; try { result = handler->OnFinished(this->sourceBase); } catch (...) { SysPushErrorCatch(); } bShouldRemove &= result; if (result) { itr = this->callbacks.base.erase(itr); } else { itr++; } } } // Evict when subs count hit zero, not when sub count started off at zero if (bOverload) { bShouldRemove = false; } // Notify global subscribers, allowing them to preempt removal if (bShouldRemove || bOverload) { AU_LOCK_GUARD(queue->sourceMutex_); for (const auto &handler : queue->globalSubcribers_) { try { bShouldRemove &= handler->OnFinished(this->sourceBase); } catch (...) { SysPushErrorCatch(); } } } return AuMakePair(true, bShouldRemove); } ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Loop Queue ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// LoopQueue::LoopQueue() { this->hEvent_ = ::CreateEventW(NULL, true, false, NULL); this->hDummy_ = ::CreateEventW(NULL, true, true, NULL); } LoopQueue::~LoopQueue() { AuWin32CloseHandle(this->hEvent_); AuWin32CloseHandle(this->hDummy_); } bool LoopQueue::Init() { return true; } bool LoopQueue::IsValid() { return this->hEvent_ != INVALID_HANDLE_VALUE && this->handleArrayOr_.size() && this->handleArrayAnd_.size(); } struct LoopJoinable { LoopQueue *that; void Lock() { that->Sync(); } bool TryLock() { return that->Sync2(); } void Unlock() { that->Unlock(); } }; void LoopQueue::Sync() { if (this->hEvent_ == INVALID_HANDLE_VALUE) { this->rwMutex_->AsWritable()->Lock(); return; } ::SetEvent(this->hEvent_); this->rwMutex_->AsWritable()->Lock(); ::ResetEvent(this->hEvent_); } bool LoopQueue::Sync2() { if (this->hEvent_ != INVALID_HANDLE_VALUE) { ::SetEvent(this->hEvent_); } if (!this->rwMutex_->AsWritable()->TryLock()) { return false; } if (this->hEvent_ != INVALID_HANDLE_VALUE) { ::ResetEvent(this->hEvent_); } return true; } void LoopQueue::Unlock() { this->rwMutex_->AsWritable()->Unlock(); } bool LoopQueue::SourceAdd(const AuSPtr &source) { AU_LOCK_GUARD(this->sourceMutex_); if (!AuTryInsert(this->addedSources_, AuMakeTuple(source, 0, SourceCallbacks {}))) { return false; } return true; } bool LoopQueue::SourceAddWithTimeout(const AuSPtr &source, AuUInt32 maxTimeout) { AU_LOCK_GUARD(this->sourceMutex_); if (!AuTryInsert(this->addedSources_, AuMakeTuple(source, AuTime::SteadyClockMS() + maxTimeout, SourceCallbacks {}))) { return false; } return true; } bool LoopQueue::SourceRemove(const AuSPtr &source) { AU_LOCK_GUARD(this->sourceMutex_); return AuTryInsert(this->removedSources_, source) || AuTryRemoveByTupleN<0>(this->addedSources_, source); } bool LoopQueue::RemoveSourceNB(const AuSPtr &source) { if (source->GetType() == ELoopSource::eSourceWin32) { this->msgSource_.reset(); this->msgCallbacks_ = {}; this->bIsWinLoop_ = false; return {}; } bool bRebuildFromAnd {}; Iterator queueIterator(this); for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; ) { if (queueIterator.itr->sourceBase == source) { if (queueIterator.itr->source) { auto count = queueIterator.itr->source->Singular() ? 1 : queueIterator.itr->source->GetHandles().size(); if (this->handleArrayOr_.size() <= MAXIMUM_WAIT_OBJECTS) { AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr, count); AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count); } else { bRebuildFromAnd = true; AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count); } queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr)); break; } else { queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr)); break; } } queueIterator.Next(); } return bRebuildFromAnd; } bool LoopQueue::Commit() { if (this->isCommitableInFuture_) { this->willCommitInFuture_ = true; return true; } LoopJoinable joinable {this}; AU_TRY_LOCK_GUARD_RET_DEF(joinable); return CommitLocked(); } void LoopQueue::BuildFromAndArray() { this->handleArrayOr_.clear(); this->handleArrayOr_.reserve(this->handleArrayAnd_.size() + 16); for (int i = 0; i < this->handleArrayAnd_.size(); i++) { if (((this->handleArrayOr_.size() % MAXIMUM_WAIT_OBJECTS)) == 0) { this->handleArrayOr_.push_back(this->hEvent_); } this->handleArrayOr_.push_back(this->handleArrayAnd_[i]); } } bool LoopQueue::CommitLocked() { try { bool bShouldRebuild {}; for (const auto &re : this->removedSources_) { bShouldRebuild |= this->RemoveSourceNB(re); } // Reserve the cache arrays for initialization this->loopSourceExs_.reserve(this->loopSourceExs_.size() + this->addedSources_.size()); this->handleArrayAnd_.reserve(this->loopSourceExs_.size() + this->addedSources_.size()); this->handleArrayOr_.reserve(this->loopSourceExs_.size() + this->addedSources_.size()); bShouldRebuild |= bool(this->addedSources_.size()); // for (auto &[source, timeout, callbacks] : this->addedSources_) { // Filter bad sources if (!source) { continue; } // Win32 edge case if (source->GetType() == ELoopSource::eSourceWin32) { this->bIsWinLoop_ = true; this->msgSource_ = source; this->msgCallbacks_ = AuMove(callbacks); continue; } // Handle known ILoopSourceEx handle objects if (auto extended = AuDynamicCast(source)) { ExtendedSourceInfo t {extended}; t.timeoutAbs = timeout; t.sourceBase = source; t.source = extended; t.callbacks = AuMove(callbacks); this->loopSourceExs_.push_back(t); if (extended->Singular()) { const auto handle = extended->GetHandle(); auto nthandle = reinterpret_cast(handle); this->handleArrayAnd_.push_back(nthandle); } else { for (const auto &handle : extended->GetHandles()) { auto nthandle = reinterpret_cast(handle); this->handleArrayAnd_.push_back(nthandle); } } } } for (const auto &re : this->removedSources_) { bShouldRebuild |= this->RemoveSourceNB(re); } AuTryClear(this->addedSources_); AuTryClear(this->removedSources_); if (bShouldRebuild) { BuildFromAndArray(); } return true; } catch (...) { return {}; } } AuUInt32 LoopQueue::GetSourceCount() { return this->loopSourceExs_.size(); } void LoopQueue::ChugPathConfigure(AuUInt32 sectionTickTime, AuSInt sectionDequeCount) { this->slowTickRate_ = AuMin(sectionDequeCount, AuSInt(MAXIMUM_WAIT_OBJECTS)); this->slowTickMs_ = sectionTickTime ? sectionTickTime : 1; } void LoopQueue::ChugHint(bool value) { this->forceChug_ = value; } bool LoopQueue::AddCallback(const AuSPtr &source, const AuSPtr &subscriber) { AU_LOCK_GUARD(this->rwMutex_->AsReadable()); if (!source) { SysPushErrorArg(); return {}; } if (!subscriber) { SysPushErrorArg(); return {}; } AuUInt32 count {}; for (auto &sourceEx : this->loopSourceExs_) { if (sourceEx.sourceBase != source) { continue; } AU_LOCK_GUARD(sourceEx.lock); if (!AuTryInsert(sourceEx.callbacks.base, subscriber)) { return {}; } count++; } if (!count) { AU_LOCK_GUARD(this->sourceMutex_); for (auto &[innerSource, timeout, callbacks] : this->addedSources_) { if (innerSource == source) { if (!AuTryInsert(callbacks.base, subscriber)) { return {}; } count++; } } } if (source->GetType() == ELoopSource::eSourceWin32) { AU_LOCK_GUARD(this->sourceMutex_); return AuTryInsert(this->msgCallbacks_.base, subscriber); } return count; } bool LoopQueue::AddCallbackEx(const AuSPtr &source, const AuSPtr &subscriber) { AU_LOCK_GUARD(this->rwMutex_->AsReadable()); if (!source) { SysPushErrorArg(); return {}; } if (!subscriber) { SysPushErrorArg(); return {}; } AuUInt32 count {}; for (auto &sourceEx : this->loopSourceExs_) { if (sourceEx.sourceBase != source) { continue; } // IMPORTANT: Soft-locks on this line, whose stack traces contain ExtendedSourceInfo::DoWork, are // a re-add during io-tick before removal condition. AU_LOCK_GUARD(sourceEx.lock); if (!AuTryInsert(sourceEx.callbacks.extended, subscriber)) { return {}; } count++; } if (!count) { AU_LOCK_GUARD(this->sourceMutex_); for (auto &[innerSource, timeout, callbacks] : this->addedSources_) { if (innerSource == source) { if (!AuTryInsert(callbacks.extended, subscriber)) { return {}; } count++; } } } if (source->GetType() == ELoopSource::eSourceWin32) { AU_LOCK_GUARD(this->sourceMutex_); return AuTryInsert(this->msgCallbacks_.extended, subscriber); } return count; } bool LoopQueue::AddCallback(const AuSPtr &subscriber) { AU_LOCK_GUARD(rwMutex_->AsReadable()); if (!subscriber) { SysPushErrorArg(); return {}; } AU_LOCK_GUARD(this->sourceMutex_); return AuTryInsert(this->globalSubcribers_, subscriber); } bool LoopQueue::IsSignaledPeek() { AuUInt32 askers {}; bool bAskers {}; return WaitAnyNBSpurious(0, askers, nullptr, true, bAskers); } bool LoopQueue::WaitAll(AuUInt32 timeout) { AU_LOCK_GUARD(this->rwMutex_->AsReadable()); bool bReturnStatus {false}; bool bTimeout {false}; AuUInt32 count {}; AuUInt32 index {}; if (this->handleArrayAnd_.empty()) { return true; } count = this->handleArrayAnd_.size(); AuUInt64 startTime = AuTime::SteadyClockNS(); AuUInt64 endTime = startTime + AuMSToNS(timeout); for (const auto &source : this->loopSourceExs_) { source.source->OnPresleep(); } while (count != index) { auto next = AuMin(count - index, AuUInt32(MAXIMUM_WAIT_OBJECTS)); startTime = AuTime::SteadyClockNS(); if (timeout) { if (endTime <= startTime) { StartUserAndTakeOwn(); ConsiderEvicitingTimeoutsAll(); StartUserAndTakeOwn_Release(); return false; } } auto timeDelta = AuNSToMS(endTime - startTime); // TODO: cap to last obj DWORD status {}; do { if (this->bIsWinLoop_) { status = ::MsgWaitForMultipleObjectsEx(next, this->handleArrayAnd_.data() + index, timeDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE | MWMO_WAITALL); } else { status = ::WaitForMultipleObjectsEx(next, this->handleArrayAnd_.data() + index, true, timeDelta, true); } } while (status == WAIT_IO_COMPLETION); if (status == WAIT_OBJECT_0 + next) { TryPumpWin32(); continue; } if (WaitToRetStatus(status)) { index += (status + 1); continue; } // Account for win32 failures if (status == WAIT_FAILED) { bReturnStatus = false; break; } // Ingore the other unrelated errors (APC notification, timeout, etc) } // OK - All signals are set // Take ownership of the queue ready for a potential purge of objects StartUserAndTakeOwn(); // Le great iterate Iterator queueIterator(this); AuSInt indexOffset {}; auto now = AuTime::SteadyClockMS(); for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; ) { auto &source = *queueIterator.itr; auto [ticked, bShouldRemove] = source.DoWork(this, source.source->Singular() ? source.source->GetHandle() : source.source->GetHandles()[0]); if (!bShouldRemove && source.ConsiderTimeout(now)) { bShouldRemove = true; } source.source->OnFinishSleep(); if (ticked) { bReturnStatus |= true; } if (this->handleArrayOr_[queueIterator.startingIndexOr] == AuReinterpretCast(this->hDummy_)) { auto handle = AuReinterpretCast(source.source->Singular() ? source.source->GetHandle() : source.source->GetHandles()[0]); this->handleArrayAnd_[queueIterator.startingIndexAnd] = handle; this->handleArrayOr_[queueIterator.startingIndexOr] = handle; } if (bShouldRemove) { if (source.source->GetType() == ELoopSource::eSourceWin32) { SysPanic("?"); } else { AuUInt uHandles = (source.source->Singular() ? 1 : source.source->GetHandles().size()); int sOffset = -((int)uHandles); if (this->handleArrayOr_.size() > MAXIMUM_WAIT_OBJECTS) { this->RemoveSourceNB(source.source); this->willCommitInFuture_ = true; } else { AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr + indexOffset, uHandles); AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd + indexOffset, uHandles); queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr)); continue; } } } //else { queueIterator.Next(); } } StartUserAndTakeOwn_Release(); return bReturnStatus; } AuUInt32 LoopQueue::WaitAny(AuUInt32 timeout) { AuUInt32 ret {}; bool lastItr {}; AuUInt64 startTime = AuTime::SteadyClockMS(); AuUInt64 endTime = timeout ? (startTime + timeout) : AuUInt64(-1); AuUInt32 chuggerIndex {}; bool bTriggerWin32 {}; do { if (WaitAnyNBSpurious(endTime, chuggerIndex, nullptr, false, bTriggerWin32)) { ret++; } PumpHooks(); } while (!bTriggerWin32 && WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0); return ret; } AuList> LoopQueue::WaitAnyEx(AuUInt32 timeout) { AuList> trigger; AuUInt64 startTime = AuTime::SteadyClockMS(); AuUInt64 endTime = timeout ? (startTime + timeout) : AuUInt64(-1); AuUInt32 chuggerIndex {}; bool bTriggerWin32 {}; do { if (WaitAnyNBSpurious(endTime, chuggerIndex, &trigger, false, bTriggerWin32)) { return trigger; } PumpHooks(); } while (!bTriggerWin32 && WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0); return trigger; } AuUInt32 LoopQueue::PumpNonblocking() { AuUInt32 ret {}; bool lastItr {}; bool bTriggerWin32 {}; AuUInt32 chuggerIndex {}; do { if (WaitAnyNBSpurious(0, chuggerIndex, nullptr, false, bTriggerWin32)) { ret++; } PumpHooks(); } while (!bTriggerWin32 && WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0); return ret; } AuList> LoopQueue::PumpNonblockingEx() { AuList> trigger; AuUInt32 chuggerIndex {}; bool bTriggerWin32 {}; do { if (WaitAnyNBSpurious(0, chuggerIndex, &trigger, false, bTriggerWin32)) { return trigger; } PumpHooks(); } while (!bTriggerWin32 && WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0); return trigger; } bool LoopQueue::ChugWaitAny(AuUInt64 internalEndTime, AuUInt32 &chuggerIndex, AuUInt32 &indexOfTriggered) { AuUInt32 count {}; AuUInt32 &index {chuggerIndex}; count = this->handleArrayOr_.size(); indexOfTriggered = -1; if (index > count) index = 0; bool active = this->hEvent_ == INVALID_HANDLE_VALUE; while (count != index ) { auto next = AuMin(count - index, AuUInt32(MAXIMUM_WAIT_OBJECTS)); auto sleepMS = this->slowTickMs_; if (internalEndTime && internalEndTime != AuUInt64(-1)) { auto now = AuTime::SteadyClockMS(); auto delta = AuInt64(internalEndTime) - AuInt64(now); if (delta <= 0) { return false; } sleepMS = AuMin(sleepMS, AuUInt32(delta)); } DWORD status {}; if (this->bIsWinLoop_) { do { status = ::MsgWaitForMultipleObjectsEx(next, this->handleArrayOr_.data() + index, sleepMS, QS_ALLINPUT | QS_ALLPOSTMESSAGE, MWMO_ALERTABLE); { auto temp2 = status; while (temp2 == WAIT_IO_COMPLETION) { temp2 = SleepEx(0, true); } } } while (status == WAIT_IO_COMPLETION); if (status == next) { indexOfTriggered = this->handleArrayOr_.size(); return true; } } else { do { status = ::WaitForMultipleObjectsEx(next, this->handleArrayOr_.data() + index, false, sleepMS, true); } while (status == WAIT_IO_COMPLETION); } if (WaitStatusFromAligned(status, active)) { DWORD offset = status - WAIT_OBJECT_0; indexOfTriggered = offset + index; return true; } if (status == WAIT_OBJECT_0 && !active) { return false; } if (status == WAIT_FAILED) { return false; } index += count; } return false; } bool LoopQueue::WaitAnyNBSpurious(AuUInt64 internalEndTime, AuUInt32 &chuggerIndex, AuList> *trigger, bool poll, bool &bTriggerWin32) { bool status {}; DWORD temp; AuUInt32 indexOfTriggered {}; AuUInt32 triggeredCount {}; AU_LOCK_GUARD(this->rwMutex_->AsReadable()); // the spurious wake up comes from an event that tells all to release me if (trigger) { trigger->reserve(this->loopSourceExs_.size()); } for (const auto &source : this->loopSourceExs_) { source.source->OnPresleep(); } if (AuExchange(this->bIOUWin32FastPath, false)) { status = this->handleArrayOr_.size(); } else if ((this->handleArrayOr_.size() > MAXIMUM_WAIT_OBJECTS) || (this->forceChug_)) { status = ChugWaitAny(internalEndTime, chuggerIndex, indexOfTriggered); } else { AuUInt32 sleepDelta {}; do { if (internalEndTime) { if (internalEndTime == AuUInt64(-1)) { sleepDelta = INFINITE; } else { auto now = AuTime::SteadyClockMS(); if (internalEndTime <= now) { return false; } sleepDelta = internalEndTime - now; } } else { sleepDelta = 0; } if (this->bIsWinLoop_) { // TODO: this might be a symptom of something else if (!AuSwInfo::IsWindows10OrGreater() && sleepDelta == INFINITE) { auto DoTryIf = [&] { temp = ::WaitForMultipleObjectsEx(this->handleArrayOr_.size(), this->handleArrayOr_.data(), false, 4, true); return WaitToRetStatus(temp); }; if (!DoTryIf()) { temp = ::MsgWaitForMultipleObjectsEx(this->handleArrayOr_.size(), this->handleArrayOr_.data(), sleepDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE); } } else { temp = ::MsgWaitForMultipleObjectsEx(this->handleArrayOr_.size(), this->handleArrayOr_.data(), sleepDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE); } } else { temp = ::WaitForMultipleObjectsEx(this->handleArrayOr_.size(), this->handleArrayOr_.data(), false, sleepDelta, true); } { auto temp2 = temp; while (temp2 == WAIT_IO_COMPLETION) { temp2 = SleepEx(0, true); } } } while (temp == WAIT_IO_COMPLETION); status = WaitToRetStatus(temp); if (status) { indexOfTriggered = temp - WAIT_OBJECT_0; } } bool bIsPump = this->handleArrayOr_.size() == indexOfTriggered; if (bIsPump) { if (poll) { this->bIOUWin32FastPath = true; return true; } else if (TryPumpWin32()) { triggeredCount++; bTriggerWin32 = true; } return triggeredCount; } else if (!status) { StartUserAndTakeOwn(); ConsiderEvicitingTimeoutsAll(); StartUserAndTakeOwn_Release(); return false; } else { static Aurora::Utility::RateLimiter limit(AuMSToNS(24)); if (limit.CheckExchangePass()) { if (!poll && TryPumpWin32()) { triggeredCount++; bTriggerWin32 = true; } } } StartUserAndTakeOwn(); AuUInt firstTriggered { reinterpret_cast(this->handleArrayOr_[indexOfTriggered]) }; { { Iterator queueIterator(this); AuSInt indexOffset {}; auto now = AuTime::SteadyClockMS(); for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; ) { bool shouldRemove {false}; auto &source = *queueIterator.itr; AuUInt lastHandle {}; bool wasTriggered {}; if (source.source) { bool singular = source.source->Singular(); bool bPollTriggered = this->handleArrayOr_[queueIterator.startingIndexOr] == this->hDummy_; if (!bPollTriggered) { if (!singular) { for (const auto &handle : source.source->GetHandles()) { if ((firstTriggered == handle) || (::WaitForSingleObject(reinterpret_cast(handle), 0) == WAIT_OBJECT_0)) { lastHandle = handle; wasTriggered = true; break; } } } else { auto handle = source.source->GetHandle(); if ((firstTriggered == handle) || (::WaitForSingleObject(reinterpret_cast(handle), 0) == WAIT_OBJECT_0)) { lastHandle = firstTriggered; wasTriggered = true; } } } if (bPollTriggered || (wasTriggered && source.source->OnTrigger(lastHandle))) { bool failing {}; if (!lastHandle) { lastHandle = (AuUInt)this->handleArrayAnd_[queueIterator.startingIndexAnd]; } if (poll) { if (!bPollTriggered) { failing = !AuTryInsert(this->handleArrayTainted_, AuMakeTuple(queueIterator.startingIndexAnd, 1, reinterpret_cast(lastHandle))); if (!failing) { triggeredCount++; this->handleArrayAnd_[queueIterator.startingIndexAnd ] = reinterpret_cast(this->hDummy_); this->handleArrayOr_[queueIterator.startingIndexOr ] = reinterpret_cast(this->hDummy_); } } else { triggeredCount++; } } if (!poll) { if (this->handleArrayTainted_.size() && bPollTriggered) { AuTuple ref; bool matched = AuRemoveIf(this->handleArrayTainted_, [&](const AuTuple &in) -> bool { bool bMatched = queueIterator.startingIndexAnd == AuGet<0>(in); if (!bMatched) { return false; } ref = in; return true; }); if (matched) { if (AuGet<1>(ref) == 1) { auto handle = AuGet<2>(ref); this->handleArrayOr_[queueIterator.startingIndexOr] = handle; this->handleArrayAnd_[queueIterator.startingIndexAnd] = handle; } } } } if (!poll || failing) { triggeredCount++; shouldRemove = true; auto [ticked, bShouldRemove] = source.DoWork(this, -1); shouldRemove = bShouldRemove; if (trigger) { AuTryInsert(*trigger, source.source); } } } } source.source->OnFinishSleep(); if (shouldRemove) { if (source.source->GetType() == ELoopSource::eSourceWin32) { SysPanic("?"); } else { AuUInt uHandles = (source.source->Singular() ? 1 : source.source->GetHandles().size()); int sOffset = -(uHandles); if (this->handleArrayOr_.size() > MAXIMUM_WAIT_OBJECTS) { this->RemoveSourceNB(source.source); this->willCommitInFuture_ = true; } else { AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr + indexOffset, uHandles); AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd + indexOffset, uHandles); queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr)); continue; } } } //else { queueIterator.Next(); } } } } StartUserAndTakeOwn_Release(); return triggeredCount; } void LoopQueue::StartUserAndTakeOwn() { this->rwMutex_->UpgradeReadToWrite(0); this->isCommitableInFuture_ = true; } bool LoopQueue::TryPumpWin32() { bool bMsgPump {}; if (!this->msgSource_) { return false; } if (AuStaticCast(this->msgSource_)->bIsPumping_) { MSG msg; try { while (PeekMessageW(&msg, NULL, 0, 0, PM_REMOVE)) { TranslateMessage(&msg); DispatchMessageW(&msg); bMsgPump = true; } } catch (...) { SysPushErrorCatch("Win32 Pump <-> Aur LoopQueue. Window handler threw a C++ exception."); } } else { MSG msg; if (PeekMessageW(&msg, NULL, 0, 0, PM_NOREMOVE)) { bMsgPump = true; } } // Notify all and remove if unwanted if (bMsgPump) { AU_LOCK_GUARD(this->sourceMutex_); bool bShouldRemove = true; for (const auto &handler : this->msgCallbacks_.base) { try { bShouldRemove &= handler->OnFinished(this->msgSource_); } catch (...) { SysPushErrorCatch(); } } AuUInt i {}; for (const auto &handler : this->msgCallbacks_.extended) { try { bShouldRemove &= handler->OnFinished(this->msgSource_, i++); } catch (...) { SysPushErrorCatch(); } } for (const auto &handler : this->globalSubcribers_) { try { bShouldRemove &= handler->OnFinished(this->msgSource_); } catch (...) { SysPushErrorCatch(); } } if (bShouldRemove) { this->bIsWinLoop_ = false; } } return bMsgPump; } void LoopQueue::StartUserAndTakeOwn_Release() { this->isCommitableInFuture_ = false; if (this->willCommitInFuture_) { this->CommitLocked(); } this->rwMutex_->DowngradeWriteToRead(); } // callee must own handle array void LoopQueue::ConsiderEvicitingTimeoutsAll() { Iterator queueIterator(this); bool bRebuildFromAnd {}; auto now = AuTime::SteadyClockMS(); for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; ) { auto &source = *queueIterator.itr; if (!source.ConsiderTimeout(now)) { queueIterator.Next(); source.source->OnFinishSleep(); continue; } source.source->OnFinishSleep(); if (source.source->GetType() == ELoopSource::eSourceWin32) { // Null message loop hack this->msgSource_.reset(); this->msgCallbacks_ = {}; this->bIsWinLoop_ = false; } else { auto count = source.source->Singular() ? 1 : source.source->GetHandles().size(); if (this->handleArrayOr_.size() <= MAXIMUM_WAIT_OBJECTS) { AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr, count); AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count); } else { bRebuildFromAnd = true; AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count); } queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr)); break; } queueIterator.Next(); } if (bRebuildFromAnd) { BuildFromAndArray(); } } bool LoopQueue::AddHook(const AuFunction &func) { return AuTryInsert(this->epilogueHooks_, func); } void LoopQueue::PumpHooks() { auto c = AuExchange(this->epilogueHooks_, {}); for (auto &a : c) { a(); } } AUKN_SYM AuSPtr NewLoopQueue() { return AuMakeShared(); } }