/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: LoopQueue.NT.cpp Date: 2022-1-8 Author: Reece ***/ #include #include "Loop.NT.hpp" #include "ILoopSourceEx.hpp" #include "LSWin32.NT.hpp" #include "LoopQueue.NT.hpp" static bool WaitToRetStatus(DWORD ret) { return !((ret == WAIT_TIMEOUT) || (ret == WAIT_IO_COMPLETION) || (ret == WAIT_FAILED)); } static bool WaitStatusFromAligned(DWORD ret, bool active) { return (WaitToRetStatus(ret) && ((active) || (ret != WAIT_OBJECT_0 /*lock releaser*/))); } namespace Aurora::Loop { LoopQueue::LoopQueue() { this->hEvent_ = CreateEventW(NULL, false, false, NULL); } LoopQueue::~LoopQueue() { AuWin32CloseHandle(this->hEvent_); } bool LoopQueue::IsValid() { return this->hEvent_ != INVALID_HANDLE_VALUE && this->handleArrayOr_.size() && this->handleArrayAnd_.size(); } struct LoopJoinable { LoopQueue *that; void Lock() { that->Sync(); } void Unlock() { that->Unlock(); } }; void LoopQueue::Sync() { if (this->hEvent_ != INVALID_HANDLE_VALUE) { this->rwMutex_->AsWritable()->Lock(); return; } SetEvent(this->hEvent_); this->rwMutex_->AsWritable()->Lock(); ResetEvent(this->hEvent_); } void LoopQueue::Unlock() { this->rwMutex_->AsWritable()->Unlock(); } bool LoopQueue::SourceAdd(const AuSPtr &source) { AU_LOCK_GUARD(this->sourceMutex_); if (!AuTryInsert(this->addedSources_, AuMakeTuple(source, 0, SourceCallbacks {}))) { return false; } return true; } bool LoopQueue::SourceAddWithTimeout(const AuSPtr &source, AuUInt32 maxTimeout) { AU_LOCK_GUARD(this->sourceMutex_); if (!AuTryInsert(this->addedSources_, AuMakeTuple(source, AuTime::CurrentInternalClockMS() + maxTimeout, SourceCallbacks {}))) { return false; } return true; } bool LoopQueue::SourceRemove(const AuSPtr &source) { AU_LOCK_GUARD(this->sourceMutex_); return AuTryInsert(this->removedSources_, source) || AuTryRemoveByTupleN<0>(this->addedSources_, source); } bool LoopQueue::RemoveSourceNB(const AuSPtr &source) { if (source->GetType() == ELoopSource::eSourceWin32) { this->msgSource_.reset(); this->msgCallbacks_ = {}; this->bIsWinLoop_ = false; return {}; } Iterator queueIterator(this); bool bRebuildFromAnd {}; for ( queueIterator.Start(); queueIterator.End() != queueIterator.itr; ) { if (queueIterator.itr->sourceBase == source) { if (queueIterator.itr->source) { auto count = queueIterator.itr->source->Singular() ? 1 : queueIterator.itr->source->GetHandles().size(); if (this->handleArrayOr_.size() <= 64) { AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr, count); AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count); } else { bRebuildFromAnd = true; AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count); } queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr)); break; } else { queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr)); break; } } queueIterator.Next(); } return bRebuildFromAnd; } bool LoopQueue::Commit() { if (this->isCommitableInFuture_) { this->willCommitInFuture_ = true; return true; } LoopJoinable joinable {this}; AU_LOCK_GUARD(joinable); return CommitLocked(); } void LoopQueue::BuildFromAndArray() { this->handleArrayOr_.clear(); for (int i = 0; i < this->handleArrayAnd_.size(); i++) { if (((this->handleArrayOr_.size() % 64)) == 0) { this->handleArrayOr_.push_back(this->hEvent_); } this->handleArrayOr_.push_back(this->handleArrayAnd_[i]); } } bool LoopQueue::CommitLocked() { try { bool bShouldRebuild {}; for (const auto &re : this->removedSources_) { bShouldRebuild |= RemoveSourceNB(re); } removedSources_.clear(); //if (this->sources_.empty() && this->removedSources_.size()) //{ // return true; //} // Reset relevant OS cache this->bIsWinLoop_ = false; this->bIsThreadSafe_ = false; //AuTryClear(this->loopSourceExs_); // fuck we're losing shit //AuTryClear(this->handleArray_); // Reserve the cache arrays for initialization this->loopSourceExs_.reserve(this->loopSourceExs_.size() + this->addedSources_.size()); this->handleArrayAnd_.reserve(this->loopSourceExs_.size() + this->addedSources_.size()); this->handleArrayOr_.reserve(this->loopSourceExs_.size() + this->addedSources_.size()); bShouldRebuild |= this->addedSources_.size(); // for (auto &[source, timeout, callbacks] : this->addedSources_) { // Filter bad sources if (!source) { continue; } // Win32 edge case if (source->GetType() == ELoopSource::eSourceWin32) { this->bIsWinLoop_ = true; this->msgSource_ = source; this->msgCallbacks_ = AuMove(callbacks); continue; } // Handle known ILoopSourceEx handle objects if (auto extended = AuDynamicCast(source)) { ExtendeSourceInfo t {extended}; t.timeoutAbs = timeout; t.sourceBase = source; t.source = extended; t.callbacks = AuMove(callbacks); this->loopSourceExs_.push_back(t); if (extended->Singular()) { const auto handle = extended->GetHandle(); auto nthandle = reinterpret_cast(handle); this->handleArrayAnd_.push_back(nthandle); } else { for (const auto &handle : extended->GetHandles()) { auto nthandle = reinterpret_cast(handle); this->handleArrayAnd_.push_back(nthandle); } } } } AuTryClear(this->addedSources_); if (bShouldRebuild) { BuildFromAndArray(); } return true; } catch (...) { return {}; } } AuUInt32 LoopQueue::GetSourceCount() { return this->loopSourceExs_.size(); } void LoopQueue::ChugPathConfigure(AuUInt32 sectionTickTime, AuSInt sectionDequeCount) { this->slowTickRate_ = AuMin(sectionDequeCount, AuSInt(MAXIMUM_WAIT_OBJECTS)); this->slowTickMs_ = sectionTickTime ? sectionTickTime : 1; } void LoopQueue::ChugHint(bool value) { this->forceChug_ = value; } bool LoopQueue::AddCallback(const AuSPtr &source, const AuSPtr &subscriber) { AU_LOCK_GUARD(this->rwMutex_->AsReadable()); if (!source) { SysPushErrorArg(); return {}; } if (!subscriber) { SysPushErrorArg(); return {}; } if (source->GetType() == ELoopSource::eSourceWin32) { AU_LOCK_GUARD(this->sourceMutex_); return AuTryInsert(this->msgCallbacks_.base, subscriber); } AuUInt32 count {}; for (auto &sourceEx : this->loopSourceExs_) { if (sourceEx.sourceBase != source) { continue; } AU_LOCK_GUARD(sourceEx.lock); if (!AuTryInsert(sourceEx.callbacks.base, subscriber)) { return {}; } count++; } if (!count) { AU_LOCK_GUARD(this->sourceMutex_); for (auto &[innerSource, timeout, callbacks] : this->addedSources_) { if (innerSource == source) { if (!AuTryInsert(callbacks.base, subscriber)) { return {}; } count++; } } } return count; } bool LoopQueue::AddCallbackEx(const AuSPtr &source, const AuSPtr &subscriber) { AU_LOCK_GUARD(this->rwMutex_->AsReadable()); if (!source) { SysPushErrorArg(); return {}; } if (!subscriber) { SysPushErrorArg(); return {}; } if (source->GetType() == ELoopSource::eSourceWin32) { AU_LOCK_GUARD(this->sourceMutex_); return AuTryInsert(this->msgCallbacks_.extended, subscriber); } AuUInt32 count {}; for (auto &sourceEx : this->loopSourceExs_) { if (sourceEx.sourceBase != source) { continue; } AU_LOCK_GUARD(sourceEx.lock); if (!AuTryInsert(sourceEx.callbacks.extended, subscriber)) { return {}; } count++; } if (!count) { AU_LOCK_GUARD(this->sourceMutex_); for (auto &[innerSource, timeout, callbacks] : this->addedSources_) { if (innerSource == source) { if (!AuTryInsert(callbacks.extended, subscriber)) { return {}; } count++; } } } return count; } bool LoopQueue::AddCallback(const AuSPtr &subscriber) { AU_LOCK_GUARD(rwMutex_->AsReadable()); if (!subscriber) { SysPushErrorArg(); return {}; } for (auto &sourceEx : this->loopSourceExs_) { AU_LOCK_GUARD(sourceEx.lock); // hack lock, but its a spin zoomer if (!AuTryInsert(sourceEx.callbacks.base, subscriber)) { return {}; } } { AU_LOCK_GUARD(this->sourceMutex_); for (auto &[innerSource, timeout, callbacks] : this->addedSources_) { if (!AuTryInsert(callbacks.base, subscriber)) { return {}; } } } { AU_LOCK_GUARD(this->sourceMutex_); return AuTryInsert(this->msgCallbacks_.base, subscriber); } return true; } bool LoopQueue::IsSignaled() { AuUInt32 askers {}; return WaitAnyNBSpurious(0, askers, nullptr, true); } bool LoopQueue::WaitAll(AuUInt32 timeout) { // TODO: AU_LOCK_GUARD(this->rwMutex_->AsReadable()); bool bReturnStatus {true}; bool bTimeout {false}; AuUInt32 count {}; AuUInt32 index {}; if (this->handleArrayAnd_.empty()) { return true; } count = this->handleArrayAnd_.size(); AuUInt32 startTime = AuTime::CurrentInternalClockMS(); AuUInt32 endTime = startTime + timeout; for (const auto &source : this->loopSourceExs_) { source.source->OnPresleep(); } bool active = this->hEvent_ == INVALID_HANDLE_VALUE; while (count != index) { auto next = AuMin(count - index, AuUInt32(MAXIMUM_WAIT_OBJECTS)); startTime = AuTime::CurrentInternalClockMS(); if (endTime <= startTime) { StartUserAndTakeOwn(); ConsiderEvicitingTimeoutsAll(); StartUserAndTakeOwn_Release(); return false; } auto timeDelta = endTime - startTime; // TODO: cap to last obj DWORD status {}; // TODO: queue apc if (this->bIsWinLoop_) { status = ::MsgWaitForMultipleObjectsEx(next, this->handleArrayAnd_.data() + index, timeDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE | MWMO_WAITALL); } else { status = ::WaitForMultipleObjectsEx(next, this->handleArrayAnd_.data() + index, true, timeDelta, true); } if (WaitToRetStatus(status)) { index += count; // continue the sleep after succesfully passing MAXIMUM_WAIT_OBJECTS or [..., EOS] objects continue; } // Account for win32 failures if (status == WAIT_FAILED) { bReturnStatus = false; break; } // Ingore the other unrelated errors (APC notification, timeout, etc) } StartUserAndTakeOwn(); ConsiderEvicitingTimeoutsAll(); StartUserAndTakeOwn_Release(); return bReturnStatus; } AuUInt32 LoopQueue::WaitAny(AuUInt32 timeout) { AuUInt32 ret {}; bool lastItr {}; AuUInt32 startTime = AuTime::CurrentInternalClockMS(); AuUInt32 endTime = timeout ? (startTime + timeout) : INFINITE; AuUInt32 chuggerIndex {}; do { if (WaitAnyNBSpurious(endTime, chuggerIndex, nullptr, false)) { ret++; } } while (WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0); return ret; } AuList> LoopQueue::WaitAnyEx(AuUInt32 timeout) { AuList> trigger; AuUInt32 startTime = AuTime::CurrentInternalClockMS(); AuUInt32 endTime = timeout ? (startTime + timeout) : INFINITE; AuUInt32 chuggerIndex {}; do { if (WaitAnyNBSpurious(endTime, chuggerIndex, &trigger, false)) { return trigger; } } while (WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0); return trigger; } bool LoopQueue::ChugWaitAny(AuUInt32 timeout, AuUInt32 &chuggerIndex, AuUInt32 &indexOfTriggered) { AuUInt32 count {}; AuUInt32 &index {chuggerIndex}; count = this->handleArrayOr_.size(); indexOfTriggered = -1; if (index > count) index = 0; bool active = this->hEvent_ == INVALID_HANDLE_VALUE; while (count != index) { auto next = AuMin(count - index, AuUInt32(MAXIMUM_WAIT_OBJECTS)); if (timeout && timeout <= AuTime::CurrentInternalClockMS()) { return false; } DWORD status {}; if (this->bIsWinLoop_) { status = ::MsgWaitForMultipleObjectsEx(next, this->handleArrayOr_.data() + index, this->slowTickMs_, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE); if (status == next) { return this->handleArrayOr_.size(); } } else { status = ::WaitForMultipleObjectsEx(next, this->handleArrayOr_.data() + index, false, this->slowTickMs_, true); } if (WaitStatusFromAligned(status, active)) { DWORD offset = status - WAIT_OBJECT_0; indexOfTriggered = offset + index; return true; } if (status == WAIT_OBJECT_0 && !active) { return false; } if (status == WAIT_FAILED) { return false; } index += count; } return false; } bool LoopQueue::WaitAnyNBSpurious(AuUInt32 timeout, AuUInt32 &chuggerIndex, AuList> *trigger, bool poll) { bool status {}; DWORD temp; AuUInt32 indexOfTriggered {}; AuUInt32 triggeredCount {}; AU_LOCK_GUARD(this->rwMutex_->AsReadable()); // the spurious wake up comes from an event that tells all to release me if (trigger) { trigger->reserve(this->loopSourceExs_.size()); } for (const auto &source : this->loopSourceExs_) { source.source->OnPresleep(); } if ((this->handleArrayOr_.size() > MAXIMUM_WAIT_OBJECTS) || (this->forceChug_)) { status = ChugWaitAny(timeout, chuggerIndex, indexOfTriggered); } else { auto now = AuTime::CurrentInternalClockMS(); AuUInt sleepDelta; if (timeout) { if (timeout <= now) { return false; } sleepDelta = timeout - now; } else { sleepDelta = 0; } if (this->bIsWinLoop_) { temp = ::MsgWaitForMultipleObjectsEx(this->handleArrayOr_.size(), this->handleArrayOr_.data(), sleepDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE); } else { temp = ::WaitForMultipleObjectsEx(this->handleArrayOr_.size(), this->handleArrayOr_.data(), false, sleepDelta, true); } status = WaitToRetStatus(temp); if (status) { indexOfTriggered = temp - WAIT_OBJECT_0; } } bool isPump = this->handleArrayOr_.size() == indexOfTriggered; StartUserAndTakeOwn(); AuUInt firstTriggered {}; if (status) { if (!isPump) { firstTriggered = reinterpret_cast(this->handleArrayOr_[indexOfTriggered]); } { Iterator queueIterator(this); AuSInt indexOffset {}; for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; ) { bool shouldRemove {false}; auto &source = *queueIterator.itr; AuUInt lastHandle {}; bool wasTriggered {}; if (source.source) { bool singular = source.source->Singular(); if (!singular) { for (const auto &handle : source.source->GetHandles()) { if ((firstTriggered == handle) || (::WaitForSingleObject(reinterpret_cast(handle), 0) == WAIT_OBJECT_0)) { lastHandle = handle; wasTriggered = true; break; } } } else { auto handle = source.source->GetHandle(); if ((firstTriggered == handle) || (::WaitForSingleObject(reinterpret_cast(handle), 0) == WAIT_OBJECT_0)) { lastHandle = firstTriggered; wasTriggered = true; } } if (wasTriggered && source.source->OnTrigger(lastHandle)) { triggeredCount++; shouldRemove = true; AU_LOCK_GUARD(const_cast(&source.lock)) // this spinlock really does feel like a hack for (const auto &handler : source.callbacks.base) { try { shouldRemove &= handler->OnFinished(source.source); } catch (...) { SysPushErrorCatch(); } } if (shouldRemove) { for (const auto &handler : source.callbacks.extended) { try { shouldRemove &= handler->OnFinished(source.source); } catch (...) { SysPushErrorCatch(); } } } if ((source.callbacks.base.empty()) && (source.callbacks.extended.empty())) { shouldRemove = false; } } } if (shouldRemove) { if (trigger) { AuTryInsert(*trigger, source.source); } } if (source.ConsiderTimeout()) { shouldRemove = true; } source.source->OnFinishSleep(); if (shouldRemove) { if (source.source->GetType() == ELoopSource::eSourceWin32) { SysPanic("?"); } else { AuUInt uHandles = (source.source->Singular() ? 1 : source.source->GetHandles().size()); int sOffset = -(uHandles); if (this->handleArrayOr_.size() > MAXIMUM_WAIT_OBJECTS) { this->RemoveSourceNB(source.source); this->willCommitInFuture_ = true; } else { AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr + indexOffset, uHandles); AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd + indexOffset, uHandles); queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr)); continue; } } } //else { queueIterator.Next(); } } } } else { ConsiderEvicitingTimeoutsAll(); } StartUserAndTakeOwn_Release(); if (isPump) { triggeredCount++; // Notify all and remove if unwanted { AU_LOCK_GUARD(this->sourceMutex_); if (trigger) { AuTryInsert(*trigger, this->msgSource_); } bool shouldRemove {true}; for (const auto &handler : this->msgCallbacks_.base) { try { shouldRemove &= handler->OnFinished(this->msgSource_); } catch (...) { SysPushErrorCatch(); } } if (shouldRemove) { this->bIsWinLoop_ = false; } } // Do the work of pumping the Win32 event queue if the user didn't flush the queue during the OnFinished callback { if (AuStaticCast(this->msgSource_)->bIsPumping_) { MSG msg; try { while (PeekMessageW(&msg, NULL, 0, 0, PM_REMOVE)) { TranslateMessage(&msg); DispatchMessageW(&msg); } } catch (...) { SysPushErrorCatch("Win32 Pump <-> Aur LoopQueue. Window handler threw a C++ exception."); } } } } return triggeredCount; } void LoopQueue::StartUserAndTakeOwn() { this->rwMutex_->UpgradeReadToWrite(0); this->isCommitableInFuture_ = true; } void LoopQueue::StartUserAndTakeOwn_Release() { this->isCommitableInFuture_ = false; if (this->willCommitInFuture_) { this->CommitLocked(); } this->rwMutex_->DowngradeWriteToRead(); } // callee must own handle array void LoopQueue::ConsiderEvicitingTimeoutsAll() { Iterator queueIterator(this); bool bRebuildFromAnd {}; for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; ) { auto &source = *queueIterator.itr; if (!source.ConsiderTimeout()) { queueIterator.Next(); source.source->OnFinishSleep(); continue; } source.source->OnFinishSleep(); if (source.source->GetType() == ELoopSource::eSourceWin32) { // Null message loop hack this->msgSource_.reset(); this->msgCallbacks_ = {}; this->bIsWinLoop_ = false; } else { auto count = source.source->Singular() ? 1 : source.source->GetHandles().size(); if (this->handleArrayOr_.size() <= 64) { AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr, count); AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count); } else { bRebuildFromAnd = true; AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count); } queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr)); break; } queueIterator.Next(); } if (bRebuildFromAnd) { BuildFromAndArray(); } } AUKN_SYM AuSPtr NewLoopQueue() { return AuMakeShared(); } }