/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: LoopQueue.NT.cpp Date: 2022-1-8 Author: Reece ***/ #include #include "Loop.NT.hpp" #include "ILoopSourceEx.hpp" #include "LSWin32.NT.hpp" #include "LoopQueue.NT.hpp" static bool WaitToRetStatus(DWORD ret) { return !((ret == WAIT_TIMEOUT) || (ret == WAIT_IO_COMPLETION) || (ret == WAIT_FAILED)); } static bool WaitStatusFromAligned(DWORD ret, bool active) { return (WaitToRetStatus(ret) && ((active) || (ret != WAIT_OBJECT_0 /*lock releaser*/))); } namespace Aurora::Loop { LoopQueue::LoopQueue() { this->hEvent_ = CreateEventW(NULL, false, false, NULL); } LoopQueue::~LoopQueue() { AuWin32CloseHandle(this->hEvent_); } bool LoopQueue::IsValid() { return this->hEvent_ != INVALID_HANDLE_VALUE && this->handleArray_.size(); } struct LoopJoinable { LoopQueue *that; void Lock() { that->Sync(); } void Unlock() { that->Unlock(); } }; void LoopQueue::Sync() { if (this->hEvent_ != INVALID_HANDLE_VALUE) { this->rwMutex_->AsWritable()->Lock(); return; } SetEvent(this->hEvent_); this->rwMutex_->AsWritable()->Lock(); ResetEvent(this->hEvent_); } void LoopQueue::Unlock() { this->rwMutex_->AsWritable()->Unlock(); } bool LoopQueue::SourceAdd(const AuSPtr &source) { AU_LOCK_GUARD(this->sourceMutex_); if (!AuTryInsert(this->sources_, AuMakeTuple(source, 0))) { return false; } return true; } bool LoopQueue::SourceAddWithTimeout(const AuSPtr &source, AuUInt32 maxTimeout) { AU_LOCK_GUARD(this->sourceMutex_); if (!AuTryInsert(this->sources_, AuMakeTuple(source, AuTime::CurrentInternalClockMS() + maxTimeout))) { return false; } return true; } bool LoopQueue::SourceRemove(const AuSPtr &source) { AU_LOCK_GUARD(this->sourceMutex_); return AuTryInsert(this->removedSources_, source); } void LoopQueue::RemoveSourceNB(const AuSPtr &source, bool removeHandles) { AU_LOCK_GUARD(this->sourceMutex_); // me AuTryRemoveByTupleN<0>(this->sources_, source); // guarded by ^ AuRemoveIf(this->loopSourceExs_, [source](const auto &re) { return re.sourceBase == source; }); // guarded by the loop joinable if (source->GetType() == ELoopSource::eSourceWin32) { this->msgSource_ = {}; this->msgCallbacks_ = {}; this->bIsWinLoop_ = false; return; } if (!removeHandles) { return; } AuUInt handleIndex {}; for (const auto &sourceEx : this->loopSourceExs_) { AuUInt lastHandle {}; bool wasTriggered {}; auto handles = sourceEx.source->GetHandles(); auto lsStartIndex = handleIndex; auto delta = handles.size(); if ((handleIndex % 64) == 0) { handleIndex++; } handleIndex += delta; if (sourceEx.source == source) { AuRemoveRange(this->handleArray_, lsStartIndex, delta); } } } bool LoopQueue::Commit() { LoopJoinable joinable {this}; AU_LOCK_GUARD(joinable); try { for (const auto &re : this->removedSources_) { RemoveSourceNB(re, this->sources_.empty()); removedSources_.clear(); } if (this->sources_.empty() && this->removedSources_.size()) { return true; } // Reset relevant OS cache this->bIsWinLoop_ = false; this->bIsThreadSafe_ = false; AuTryClear(this->loopSourceExs_); AuTryClear(this->handleArray_); // Reserve the cache arrays for initialization this->loopSourceExs_.reserve(this->sources_.size()); this->handleArray_.reserve(this->sources_.size()); // for (const auto &[source, timeout] : sources_) { // Filter bad sources if (!source) { continue; } // Win32 edge case if (source->GetType() == ELoopSource::eSourceWin32) { this->bIsWinLoop_ = true; this->msgSource_ = source; continue; } // Handle known ILoopSourceEx handle objects if (auto extended = AuDynamicCast(source)) { ExtendeSourceInfo t {extended}; t.timeoutAbs = timeout; t.sourceBase = source; t.source = extended; this->loopSourceExs_.push_back(t); if ((this->handleArray_.size() % MAXIMUM_WAIT_OBJECTS) == 0) { if (this->hEvent_ != INVALID_HANDLE_VALUE) { this->bIsThreadSafe_ = true; this->handleArray_.push_back(hEvent_); } } for (const auto &handle : extended->GetHandles()) { auto nthandle = reinterpret_cast(handle); this->handleArray_.push_back(nthandle); } } } return true; } catch (...) { AuTryClear(this->loopSourceExs_); AuTryClear(this->handleArray_); return {}; } } AuUInt32 LoopQueue::GetSourceCount() { return this->loopSourceExs_.size(); } void LoopQueue::ChugPathConfigure(AuUInt32 sectionTickTime, AuSInt sectionDequeCount) { this->slowTickRate_ = AuMin(sectionDequeCount, AuSInt(MAXIMUM_WAIT_OBJECTS)); this->slowTickMs_ = sectionTickTime ? sectionTickTime : 1; } void LoopQueue::ChugHint(bool value) { this->forceChug_ = value; } bool LoopQueue::AddCallback(const AuSPtr &source, const AuSPtr &subscriber) { AU_LOCK_GUARD(this->rwMutex_->AsReadable()); if (!source) { SysPushErrorArg(); return {}; } if (!subscriber) { SysPushErrorArg(); return {}; } if (source->GetType() == ELoopSource::eSourceWin32) { AU_LOCK_GUARD(this->sourceMutex_); return AuTryInsert(this->msgCallbacks_, subscriber); } AuUInt32 count {}; for (auto &sourceEx : this->loopSourceExs_) { if (sourceEx.sourceBase != source) { continue; } AU_LOCK_GUARD(sourceEx.lock); if (!AuTryInsert(sourceEx.specificSubscribers, subscriber)) { return {}; } count++; } return count; } bool LoopQueue::AddCallbackEx(const AuSPtr &source, const AuSPtr &subscriber) { AU_LOCK_GUARD(this->rwMutex_->AsReadable()); if (!source) { SysPushErrorArg(); return {}; } if (!subscriber) { SysPushErrorArg(); return {}; } if (source->GetType() == ELoopSource::eSourceWin32) { return false; } AuUInt32 count {}; for (auto &sourceEx : this->loopSourceExs_) { if (sourceEx.sourceBase != source) { continue; } AU_LOCK_GUARD(sourceEx.lock); if (!AuTryInsert(sourceEx.timeoutSubscribers, subscriber)) { return {}; } count++; } return count; } bool LoopQueue::AddCallback(const AuSPtr &subscriber) { AU_LOCK_GUARD(rwMutex_->AsReadable()); if (!subscriber) { SysPushErrorArg(); return {}; } // yes, this seems insane; however, the spinlock is basically free. // disturbing the rwMutex_ would mean spuriously waking up all the Wait[...]Nb functions // unlikely, but whats even less likely is for each sources spinlock to be held. // this should be a work init-bound function, not wait or commit, so the vector abuse doesnt matter so much for (auto &sourceEx : this->loopSourceExs_) { AU_LOCK_GUARD(sourceEx.lock); // hack lock, but its a spin zoomer if (!AuTryInsert(sourceEx.globalSubscribers, subscriber)) { return {}; } } { AU_LOCK_GUARD(this->sourceMutex_); return AuTryInsert(this->msgCallbacks_, subscriber); } return true; } bool LoopQueue::HasFinished() { AU_LOCK_GUARD(this->rwMutex_->AsReadable()); AuUInt32 askers {}; return WaitAnyNBSpurious(INFINITE, askers, nullptr, true) == this->sources_.size(); } bool LoopQueue::IsSignaled() { AU_LOCK_GUARD(this->rwMutex_->AsReadable()); AuUInt32 askers {}; return WaitAnyNBSpurious(0, askers, nullptr, true); } bool LoopQueue::WaitAll(AuUInt32 timeout) { AU_LOCK_GUARD(this->rwMutex_->AsReadable()); bool bReturnStatus {true}; bool bTimeout {false}; AuUInt32 count {}; AuUInt32 index {}; if (this->handleArray_.empty()) { return true; } count = this->handleArray_.size(); AuUInt32 startTime = AuTime::CurrentInternalClockMS(); AuUInt32 endTime = startTime + timeout; bool active = this->hEvent_ == INVALID_HANDLE_VALUE; while (count != index) { auto next = AuMin(count - index, AuUInt32(MAXIMUM_WAIT_OBJECTS)); startTime = AuTime::CurrentInternalClockMS(); if (endTime <= startTime) { ConsiderEvicitingTimeoutsAll(); return false; } auto timeDelta = endTime - startTime; // TODO: cap to last obj DWORD status {}; if (this->bIsWinLoop_) { status = ::MsgWaitForMultipleObjectsEx(next, this->handleArray_.data() + index, timeDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE | MWMO_WAITALL); } else { status = ::WaitForMultipleObjectsEx(next, this->handleArray_.data() + index, true, timeDelta, true); } if (WaitStatusFromAligned(status, active)) { index += count; // continue the sleep after succesfully passing MAXIMUM_WAIT_OBJECTS or [..., EOS] objects continue; } // Account for updates if (status == WAIT_OBJECT_0 && !active) { count = this->handleArray_.size(); if (count == 0) { return true; } else if (index > count) { index = 0; } continue; } // Account for win32 failures if (status == WAIT_FAILED) { bReturnStatus = false; break; } // Ingore the other unrelated errors (APC notification, timeout, etc) } ConsiderEvicitingTimeoutsAll(); return bReturnStatus; } AuUInt32 LoopQueue::WaitAny(AuUInt32 timeout) { AuUInt32 ret {}; bool lastItr {}; AuUInt32 startTime = AuTime::CurrentInternalClockMS(); AuUInt32 endTime = timeout ? (startTime + timeout) : INFINITE; AuUInt32 chuggerIndex {}; do { if (WaitAnyNBSpurious(endTime, chuggerIndex, nullptr, false)) { ret++; } } while (WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0); return ret; } AuList> LoopQueue::WaitAnyEx(AuUInt32 timeout) { AuList> trigger; AuUInt32 startTime = AuTime::CurrentInternalClockMS(); AuUInt32 endTime = timeout ? (startTime + timeout) : INFINITE; AuUInt32 chuggerIndex {}; do { if (WaitAnyNBSpurious(endTime, chuggerIndex, &trigger, false)) { return trigger; } } while (WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0); return trigger; } bool LoopQueue::ChugWaitAny(AuUInt32 timeout, AuUInt32 &chuggerIndex, AuUInt32 &indexOfTriggered) { AuUInt32 count {}; AuUInt32 &index {chuggerIndex}; count = this->handleArray_.size(); indexOfTriggered = -1; if (index > count) index = 0; bool active = this->hEvent_ == INVALID_HANDLE_VALUE; while (count != index) { auto next = AuMin(count - index, AuUInt32(MAXIMUM_WAIT_OBJECTS)); if (timeout && timeout <= AuTime::CurrentInternalClockMS()) { return false; } DWORD status {}; if (this->bIsWinLoop_) { status = ::MsgWaitForMultipleObjectsEx(next, this->handleArray_.data() + index, this->slowTickMs_, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE); if (status == next) { return this->handleArray_.size(); } } else { status = ::WaitForMultipleObjectsEx(next, this->handleArray_.data() + index, false, this->slowTickMs_, true); } if (WaitStatusFromAligned(status, active)) { DWORD offset = status - WAIT_OBJECT_0; indexOfTriggered = offset + index; return true; } if (status == WAIT_OBJECT_0 && !active) { return false; } if (status == WAIT_FAILED) { return false; } index += count; } return false; } bool LoopQueue::WaitAnyNBSpurious(AuUInt32 timeout, AuUInt32 &chuggerIndex, AuList> *trigger, bool poll) { bool status {}; DWORD temp; AuUInt32 indexOfTriggered {}; AuUInt32 triggeredCount {}; AU_LOCK_GUARD(this->rwMutex_->AsReadable()); // the spurious wake up comes from an event that tells all to release me if (trigger) { trigger->reserve(this->sources_.size()); } for (const auto &source : this->loopSourceExs_) { source.source->OnPresleep(); } if ((this->handleArray_.size() > MAXIMUM_WAIT_OBJECTS) || (this->forceChug_)) { status = ChugWaitAny(timeout, chuggerIndex, indexOfTriggered); } else { auto now = AuTime::CurrentInternalClockMS(); if (timeout && timeout <= now) { return false; } auto sleepDelta = timeout - now; if (this->bIsWinLoop_) { temp = ::MsgWaitForMultipleObjectsEx(this->handleArray_.size(), this->handleArray_.data(), sleepDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE); } else { temp = ::WaitForMultipleObjectsEx(this->handleArray_.size(), this->handleArray_.data(), false, sleepDelta, true); } status = WaitToRetStatus(temp); if (status) { indexOfTriggered = temp - WAIT_OBJECT_0; } } bool isPump = this->handleArray_.size() == indexOfTriggered; AuUInt firstTriggered {}; if (status) { if (!isPump) { firstTriggered = reinterpret_cast(this->handleArray_[indexOfTriggered]); } AuUInt handleIndex {}; for (auto itr = this->loopSourceExs_.begin(); this->loopSourceExs_.end() != itr; ) { auto &source = *itr; AuUInt lastHandle {}; bool wasTriggered {}; auto handles = source.source->GetHandles(); if ((handleIndex % 64) == 0) { handleIndex++; } auto lsStartIndex = handleIndex; for (const auto &handle : handles) { handleIndex++; if ((firstTriggered == handle) || (::WaitForSingleObject(reinterpret_cast(handle), 0) == WAIT_OBJECT_0)) { lastHandle = handle; wasTriggered = true; break; } } bool shouldRemove {false}; if (wasTriggered && source.source->OnTrigger(lastHandle)) { triggeredCount++; shouldRemove = true; AU_LOCK_GUARD(const_cast(&source.lock)) // this spinlock really does feel like a hack for (const auto &handler : source.specificSubscribers) { try { shouldRemove &= handler->OnFinished(source.source); } catch (...) { SysPushErrorCatch(); } } if (shouldRemove) { for (const auto &handler : source.timeoutSubscribers) { try { shouldRemove &= handler->OnFinished(source.source); } catch (...) { SysPushErrorCatch(); } } } if (shouldRemove) { for (const auto &handler : source.globalSubscribers) { try { shouldRemove &= handler->OnFinished(source.source); } catch (...) { SysPushErrorCatch(); } } } if ((source.specificSubscribers.empty()) && (source.globalSubscribers.empty()) && (source.timeoutSubscribers.empty())) { shouldRemove = false; } } if (shouldRemove) { if (trigger) { AuTryInsert(*trigger, source.source); } } if (source.ConsiderTimeout()) { shouldRemove = true; } source.source->OnFinishSleep(); if (shouldRemove) { // Evict from OS cache AuRemoveRange(this->handleArray_, lsStartIndex, handles.size()); { AU_LOCK_GUARD(this->sourceMutex_); AuTryRemoveByTupleN<0>(this->sources_, source.sourceBase); } if (source.source->GetType() == ELoopSource::eSourceWin32) { this->msgSource_ = {}; this->msgCallbacks_ = {}; this->bIsWinLoop_ = false; } itr = this->loopSourceExs_.erase(itr); handleIndex -= handles.size(); } else { itr++; } } } else { ConsiderEvicitingTimeoutsAll(); } if (isPump) { triggeredCount++; // Notify all and remove if unwanted { AU_LOCK_GUARD(this->sourceMutex_); AuTryInsert(*trigger, this->msgSource_); bool shouldRemove {true}; for (const auto &handler : this->msgCallbacks_) { try { shouldRemove &= handler->OnFinished(this->msgSource_); } catch (...) { SysPushErrorCatch(); } } if (shouldRemove) { this->bIsWinLoop_ = false; } } // Do the work of pumping the Win32 event queue if the user didn't flush the queue during the OnFinished callback { if (AuStaticCast(this->msgSource_)->bIsPumping_) { MSG msg; try { while (PeekMessageW(&msg, NULL, 0, 0, PM_REMOVE)) { TranslateMessage(&msg); DispatchMessageW(&msg); } } catch (...) { SysPushErrorCatch("Win32 Pump <-> Aur LoopQueue. Window handler threw a C++ exception."); } } } } return triggeredCount; } void LoopQueue::ConsiderEvicitingTimeoutsAll() { AuUInt32 startIndex {}; for (auto itr = this->loopSourceExs_.begin(); this->loopSourceExs_.end() != itr; ) { auto &source = *itr; if ((startIndex % 64) == 0) { startIndex++; } auto count = source.source->GetHandles().size(); auto begin = startIndex; startIndex += count; if (!source.ConsiderTimeout()) { itr++; continue; } // Evict from OS cache AuRemoveRange(this->handleArray_, startIndex, count); { AU_LOCK_GUARD(this->sourceMutex_); AuTryRemoveByTupleN<0>(this->sources_, source.sourceBase); } if (source.source->GetType() == ELoopSource::eSourceWin32) { this->msgSource_ = {}; this->msgCallbacks_ = {}; this->bIsWinLoop_ = false; } itr = this->loopSourceExs_.erase(itr); } } AUKN_SYM AuSPtr NewLoopQueue() { return AuMakeShared(); } }