/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: LoopQueue.Linux.hpp Date: 2022-4-4 Author: Reece ***/ #pragma once #include "ILoopSourceEx.hpp" #include "LSEvent.hpp" namespace Aurora::IO::Loop { struct LoopQueue : ILoopQueue { LoopQueue(); ~LoopQueue(); bool AddHook(const AuFunction &func); bool Init(); void Deinit(); bool SourceAdd(const AuSPtr &source) override; bool SourceAddWithTimeout(const AuSPtr &source, AuUInt32 ms) override; bool SourceAddWithTimeoutEx(const AuSPtr &source, AuUInt32 ms) ; bool SourceRemove(const AuSPtr &source) override; AuUInt32 GetSourceCount() override; virtual bool AddCallback(const AuSPtr &source, const AuSPtr &subscriber) override; virtual bool AddCallbackEx(const AuSPtr &source, const AuSPtr &subscriber) override; virtual bool AddCallback(const AuSPtr &subscriber) override; void ChugPathConfigure(AuUInt32 sectionTickTime, AuSInt sectionDequeCount) override; void ChugHint(bool value) override; virtual bool Commit() override; bool IsSignaledPeek() override; AuUInt32 PumpNonblocking() override; AuList> PumpNonblockingEx() override; bool WaitAll(AuUInt32 timeout) override; AuUInt32 WaitAny(AuUInt32 timeout) override; AuList> WaitAnyEx(AuUInt32 timeout) override; AuUInt32 DoTick(AuUInt64, AuList> *optOut = nullptr, bool *tryAgain = nullptr, bool async = false); private: void PumpHooks(); bool CommitDecommit(); struct SourceExtended { SourceExtended(LoopQueue *parent, const AuSPtr &source); ~SourceExtended(); void Deinit(); void Commit(const AuSPtr &self); AuSPtr source; ILoopSourceEx *sourceExtended {}; LoopQueue *parent {}; AuWPtr pin; AuUInt64 timeoutAbs {}; bool ConsiderTimeout(AuUInt64 time) const { if ((timeoutAbs) && (time >= timeoutAbs)) { for (const auto &handler : subscriberExs) { try { handler->OnTimeout(source); } catch (...) { SysPushErrorCatch(); } } return true; } return false; } AuList> subscribers; AuList> subscriberExs; bool bHasCommited {}; // ticked, should remove AuTuple DoWork(int fd); AuTuple DoWork(bool read, bool write); }; struct AnEpoll { LoopQueue *parent {}; AuThreadPrimitives::SpinLock lock; AuBST startingWorkRead; AuBST startingWorkWrite; void Add(SourceExtended *source); void Remove(SourceExtended *source, bool readData, bool writeData); }; int epollFd_{ -1 }; LSEvent lockStealer_; AuThreadPrimitives::SpinLock commitQueueMutex_; AuList, AuSPtr, AuSPtr>> commitPending_; AuList> decommitQueue_; AuThreadPrimitives::SpinLock globalLockMutex_; AuList> allSubscribers_; AuThreadPrimitives::RWLockUnique_t sourceMutex_; AuList> sources_; AuThreadPrimitives::RWLockUnique_t polledItemsMutex_; AuList alternativeEpolls_; AuList> epilogueHooks_; AuList, AuUInt32>> pendingBlocking_; bool bRecommitLater {}; AnEpoll globalEpoll_; }; }