204 lines
7.1 KiB
C++
204 lines
7.1 KiB
C++
/***
|
|
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
File: LoopQueue.NT.hpp
|
|
Date: 2022-2-1W2
|
|
Author: Reece
|
|
***/
|
|
#pragma once
|
|
|
|
#include "ILoopSourceEx.hpp"
|
|
#include "LoopQueue.hpp"
|
|
|
|
namespace Aurora::IO::Loop
|
|
{
|
|
struct LoopQueue : ILoopQueue//, ILoopEpilogueHook
|
|
{
|
|
LoopQueue();
|
|
~LoopQueue();
|
|
|
|
bool Init();
|
|
|
|
bool AddHook(const AuFunction<void()> &func);// override;
|
|
void PumpHooks();
|
|
|
|
bool SourceAdd(const AuSPtr<ILoopSource> &source) override;
|
|
bool SourceAddWithTimeout(const AuSPtr<ILoopSource> &source, AuUInt32 ms) override;
|
|
bool SourceRemove(const AuSPtr<ILoopSource> &source) override;
|
|
|
|
AuUInt32 GetSourceCount() override;
|
|
|
|
virtual bool AddCallback(const AuSPtr<ILoopSource> &source, const AuSPtr<ILoopSourceSubscriber> &subscriber) override;
|
|
virtual bool AddCallbackEx(const AuSPtr<ILoopSource> &source, const AuSPtr<ILoopSourceSubscriberEx> &subscriber) override;
|
|
virtual bool AddCallback(const AuSPtr<ILoopSourceSubscriber> &subscriber) override;
|
|
|
|
void ChugPathConfigure(AuUInt32 sectionTickTime, AuSInt sectionDequeCount) override;
|
|
void ChugHint(bool value) override;
|
|
|
|
virtual bool Commit() override;
|
|
|
|
bool IsSignaledPeek() override;
|
|
AuUInt32 PumpNonblocking() override;
|
|
AuList<AuSPtr<ILoopSource>> PumpNonblockingEx() override;
|
|
|
|
bool WaitAll(AuUInt32 timeout) override;
|
|
bool WaitAllEx(AuUInt32 timeout, bool &bFinished);
|
|
AuUInt32 WaitAny(AuUInt32 timeout) override;
|
|
AuList<AuSPtr<ILoopSource>> WaitAnyEx(AuUInt32 timeout) override;
|
|
|
|
void Sync();
|
|
bool Sync2();
|
|
void Unlock();
|
|
bool bCheckForDups {};
|
|
bool bCheckForForce {};
|
|
|
|
private:
|
|
|
|
void BuildFromAndArray();
|
|
|
|
void ConsiderEvicitingTimeoutsAll();
|
|
bool CommitLocked();
|
|
|
|
struct SourceCallbacks
|
|
{
|
|
AuList<AuSPtr<ILoopSourceSubscriber>> base;
|
|
AuList<AuSPtr<ILoopSourceSubscriberEx>> extended;
|
|
};
|
|
|
|
struct ExtendedSourceInfo
|
|
{
|
|
ExtendedSourceInfo(const AuSPtr<ILoopSourceEx> &in);
|
|
|
|
AuSPtr<ILoopSourceEx> source;
|
|
AuSPtr<ILoopSource> sourceBase;
|
|
AuThreadPrimitives::SpinLock lock; // im too brain dead to think of a solution to the AddCallback thread safety issue, so i'm going to optimize Wait[...]s subscriber lookup with filtered ccaches in here, protected by this spinlock
|
|
AuUInt64 timeoutAbs {};
|
|
LoopQueue *parent {};
|
|
SourceCallbacks callbacks;
|
|
bool invalidateItr {};
|
|
bool ConsiderTimeout(AuUInt64 time) const;
|
|
AuPair<bool, bool> DoWork(LoopQueue* queue, AuUInt handle);
|
|
};
|
|
|
|
bool IsValid();
|
|
bool RemoveSourceNB(const AuSPtr<ILoopSource> &source);
|
|
bool WaitAnyNBSpurious(AuUInt64 internalEndTime, AuUInt32 &chuggerIndex, AuList<AuSPtr<ILoopSource>> *trigger, bool poll, bool &bTriggerWin32);
|
|
bool WaitAnyNBSpuriousEx(AuUInt64 internalEndTime, AuUInt32 &chuggerIndex, AuList<AuSPtr<ILoopSource>> *trigger, bool poll, bool &bTriggerWin32, bool &bFinished);
|
|
bool ChugWaitAny(AuUInt64 internalEndTime, AuUInt32 &chuggerIndex, AuUInt32 &offset);
|
|
|
|
bool StartUserAndTakeOwn();
|
|
void StartUserAndTakeOwn_Release();
|
|
|
|
bool TryPumpWin32();
|
|
|
|
//
|
|
SourceCallbacks msgCallbacks_;
|
|
AuList<AuSPtr<ILoopSource>> removedSources_;
|
|
AuThreadPrimitives::CriticalSection sourceMutex_;
|
|
AuList<AuTuple<AuSPtr<ILoopSource>, AuUInt32, SourceCallbacks>> addedSources_;
|
|
//
|
|
AuList<AuSPtr<ILoopSourceSubscriber>> globalSubcribers_;
|
|
|
|
// OS Cache (working around MAXIMUM_WAIT_OBJECTS, no poll, etc)
|
|
AuThreadPrimitives::RWRenterableLock rwMutex_;
|
|
bool bIsWinLoop_ {};
|
|
bool bIsThreadSafe_ {};
|
|
AuList<HANDLE> handleArrayOr_;
|
|
bool bIOUWin32FastPath {};
|
|
AuList<HANDLE> handleArrayAnd_;
|
|
AuList<AuTuple<int, int, HANDLE>> handleArrayTainted_;
|
|
AuUInt32 slowTickMs_ {1};
|
|
AuUInt32 slowTickRate_ {MAXIMUM_WAIT_OBJECTS};
|
|
bool forceChug_ {};
|
|
bool isCommitableInFuture_ {};
|
|
bool willCommitInFuture_ {};
|
|
|
|
AuUInt32 uNestedWriteLock_ {};
|
|
|
|
// we're only supporting one msg AddSource for now
|
|
AuSPtr<ILoopSource> msgSource_;
|
|
|
|
// reusable internal handles
|
|
HANDLE hEvent_ {INVALID_HANDLE_VALUE};
|
|
HANDLE hDummy_ {INVALID_HANDLE_VALUE};
|
|
|
|
// special
|
|
AuList<ExtendedSourceInfo> loopSourceExs_;
|
|
bool bLargeCommitQueueDirty {};
|
|
|
|
struct Iterator
|
|
{
|
|
LoopQueue *base;
|
|
|
|
inline Iterator(LoopQueue *base) : base(base)
|
|
{
|
|
}
|
|
|
|
inline auto End()
|
|
{
|
|
return this->base->loopSourceExs_.end();
|
|
}
|
|
|
|
inline void Start()
|
|
{
|
|
this->startingIndexAnd = 0;
|
|
this->startingIndexOr = 1;
|
|
itr = this->base->loopSourceExs_.begin();
|
|
}
|
|
|
|
inline void Next()
|
|
{
|
|
Increment();
|
|
itr++;
|
|
}
|
|
|
|
template<class T>
|
|
inline void Delete(T &&itr)
|
|
{
|
|
this->itr = AuMove(itr);
|
|
}
|
|
|
|
inline void Increment()
|
|
{
|
|
if (itr != End())
|
|
{
|
|
if (itr->source->Singular())
|
|
{
|
|
startingIndexOr++;
|
|
startingIndexAnd++;
|
|
|
|
if ((startingIndexOr % 64) == 0)
|
|
{
|
|
startingIndexOr++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// TODO: this is gross and slow
|
|
for (int i = 0; i < itr->source->GetHandles().size(); i++)
|
|
{
|
|
startingIndexOr += i;
|
|
startingIndexAnd += i;
|
|
|
|
if ((startingIndexOr % 64) == 0)
|
|
{
|
|
startingIndexOr++;
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
AuUInt startingIndexOr {};
|
|
AuUInt startingIndexAnd {};
|
|
|
|
AuToIterator_t<AuList<ExtendedSourceInfo>> itr;
|
|
|
|
};
|
|
|
|
AuList<AuFunction<void()>> epilogueHooks_;
|
|
|
|
friend struct Iteartor;
|
|
};
|
|
} |