/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: LoopQueue.NT.hpp Date: 2022-2-1W2 Author: Reece ***/ #pragma once #include "ILoopSourceEx.hpp" namespace Aurora::Loop { struct LoopQueue : ILoopQueue { LoopQueue(); ~LoopQueue(); bool SourceAdd(const AuSPtr &source) override; bool SourceAddWithTimeout(const AuSPtr &source, AuUInt32 ms) override; bool SourceRemove(const AuSPtr &source) override; AuUInt32 GetSourceCount() override; virtual bool AddCallback(const AuSPtr &source, const AuSPtr &subscriber) override; virtual bool AddCallbackEx(const AuSPtr &source, const AuSPtr &subscriber) override; virtual bool AddCallback(const AuSPtr &subscriber) override; void ChugPathConfigure(AuUInt32 sectionTickTime, AuSInt sectionDequeCount) override; void ChugHint(bool value) override; virtual bool Commit() override; bool IsSignaled() override; bool WaitAll(AuUInt32 timeout) override; AuUInt32 WaitAny(AuUInt32 timeout) override; AuList> WaitAnyEx(AuUInt32 timeout) override; void Sync(); void Unlock(); private: void BuildFromAndArray(); void ConsiderEvicitingTimeoutsAll(); bool CommitLocked(); struct SourceCallbacks { AuList> base; AuList> extended; }; struct ExtendeSourceInfo { inline ExtendeSourceInfo(AuSPtr in) : source(in) {} AuSPtr source; AuSPtr sourceBase; AuThreadPrimitives::SpinLock lock; // im too brain dead to think of a solution to the AddCallback thread safety issue, so i'm going to optimize Wait[...]s subscriber lookup with filtered ccaches in here, protected by this spinlock AuUInt32 timeoutAbs; SourceCallbacks callbacks; bool ConsiderTimeout() const { if ((timeoutAbs) && (AuTime::CurrentInternalClockMS() >= timeoutAbs)) { for (const auto &handler : callbacks.extended) { try { handler->OnTimeout(sourceBase); } catch (...) { SysPushErrorCatch(); } } return true; } return false; } }; bool IsValid(); bool RemoveSourceNB(const AuSPtr &source); bool WaitAnyNBSpurious(AuUInt32 timeout, AuUInt32 &chuggerIndex, AuList> *trigger, bool poll); bool ChugWaitAny(AuUInt32 timeout, AuUInt32 &chuggerIndex, AuUInt32 &offset); void StartUserAndTakeOwn(); void StartUserAndTakeOwn_Release(); // SourceCallbacks msgCallbacks_; AuList> removedSources_; AuThreadPrimitives::SpinLock sourceMutex_; AuList, AuUInt32, SourceCallbacks>> addedSources_; // // OS Cache AuThreadPrimitives::RWLockUnique_t rwMutex_ = AuThreadPrimitives::RWLockUnique(); bool bIsWinLoop_ {}; bool bIsThreadSafe_ {}; AuList handleArrayOr_; AuList handleArrayAnd_; AuUInt32 slowTickMs_ {1}; AuUInt32 slowTickRate_ {MAXIMUM_WAIT_OBJECTS}; bool forceChug_ {}; bool isCommitableInFuture_ {}; bool willCommitInFuture_ {}; AuSPtr msgSource_; HANDLE hEvent_ {INVALID_HANDLE_VALUE}; AuList loopSourceExs_; bool bLargeCommitQueueDirty {}; struct Iterator { LoopQueue *base; inline Iterator(LoopQueue *base) : base(base) { } inline auto End() { return this->base->loopSourceExs_.end(); } inline void Start() { this->startingIndexAnd = 0; this->startingIndexOr = 1; itr = this->base->loopSourceExs_.begin(); } inline void Next() { Increment(); itr++; } template inline void Delete(T &&itr) { this->itr = AuMove(itr); } inline void Increment() { if (itr != End()) { if (itr->source->Singular()) { startingIndexOr++; startingIndexAnd++; if ((startingIndexOr % 64) == 0) { startingIndexOr++; } } else { // TODO: this is gross and slow for (int i = 0; i < itr->source->GetHandles().size(); i++) { startingIndexOr += i; startingIndexAnd += i; if ((startingIndexOr % 64) == 0) { startingIndexOr++; } } } } } AuUInt startingIndexOr {}; AuUInt startingIndexAnd {}; AuToIterator_t> itr; }; friend struct Iteartor; }; }