Reece Wilson
7be2d3fbdc
[+] AuNet::ISocketStats [+] AuNet::ISocketChannel::GetRecvStats() [+] AuNet::ISocketChannel::GetSendStats() [+] AuIO::IOProcessor::RunTickEx(AuUInt32 dwTimeout) [*] Refactor clock APIs [+] Documentation in headers [+] AuIO::IIOPipeWork::GetStartTickMS() [+] AuIO::IIOPipeWork::GetLastTickMS() [+] AuIO::IIOPipeWork::GetPredictedThroughput() [+] AuIO::IIOPipeWork::GetBytesProcessed()
1319 lines
38 KiB
C++
1319 lines
38 KiB
C++
/***
|
|
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
File: LoopQueue.NT.cpp
|
|
Date: 2022-1-8
|
|
Author: Reece
|
|
Note: Win32 sewage lmao
|
|
***/
|
|
#include <Source/RuntimeInternal.hpp>
|
|
#include "Loop.NT.hpp"
|
|
#include "ILoopSourceEx.hpp"
|
|
#include "LSWin32.NT.hpp"
|
|
#include "LoopQueue.NT.hpp"
|
|
|
|
static bool WaitToRetStatus(DWORD ret)
|
|
{
|
|
return !((ret == WAIT_TIMEOUT) ||
|
|
(ret == WAIT_IO_COMPLETION) ||
|
|
(ret == WAIT_FAILED));
|
|
}
|
|
|
|
static bool WaitStatusFromAligned(DWORD ret, bool active)
|
|
{
|
|
return (WaitToRetStatus(ret) && ((active) || (ret != WAIT_OBJECT_0 /*lock releaser*/)));
|
|
}
|
|
|
|
namespace Aurora::IO::Loop
|
|
{
|
|
|
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
// Loop Queue :: Extended Handle
|
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
LoopQueue::ExtendedSourceInfo::ExtendedSourceInfo(const AuSPtr<ILoopSourceEx> &in) : source(in), timeoutAbs(0)
|
|
{
|
|
|
|
}
|
|
|
|
bool LoopQueue::ExtendedSourceInfo::ConsiderTimeout(AuUInt64 time) const
|
|
{
|
|
if (!this->timeoutAbs)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
if (time < this->timeoutAbs)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
for (const auto &handler : callbacks.extended)
|
|
{
|
|
try
|
|
{
|
|
handler->OnTimeout(sourceBase);
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
AuPair<bool, bool> LoopQueue::ExtendedSourceInfo::DoWork(LoopQueue *queue, AuUInt handle)
|
|
{
|
|
bool bShouldRemove {true};
|
|
AuUInt8 uPosition {};
|
|
|
|
if (this->source && handle != -1)
|
|
{
|
|
if (!this->source->OnTrigger(handle))
|
|
{
|
|
return {};
|
|
}
|
|
}
|
|
|
|
bool bOverload {};
|
|
|
|
{
|
|
AU_LOCK_GUARD(&this->lock);
|
|
|
|
if ((this->callbacks.base.empty()) &&
|
|
(this->callbacks.extended.empty()))
|
|
{
|
|
bOverload = true;
|
|
}
|
|
|
|
// Notify callbacks...
|
|
for (auto itr = this->callbacks.extended.begin();
|
|
itr != this->callbacks.extended.end(); )
|
|
{
|
|
bool result;
|
|
auto handler = *itr;
|
|
|
|
try
|
|
{
|
|
result = handler->OnFinished(this->sourceBase, uPosition++);
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
|
|
bShouldRemove &= result;
|
|
|
|
if (result)
|
|
{
|
|
itr = this->callbacks.extended.erase(itr);
|
|
}
|
|
else
|
|
{
|
|
itr++;
|
|
}
|
|
}
|
|
|
|
for (auto itr = this->callbacks.base.begin();
|
|
itr != this->callbacks.base.end(); )
|
|
{
|
|
bool result;
|
|
auto handler = *itr;
|
|
|
|
try
|
|
{
|
|
result = handler->OnFinished(this->sourceBase);
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
|
|
bShouldRemove &= result;
|
|
|
|
if (result)
|
|
{
|
|
itr = this->callbacks.base.erase(itr);
|
|
}
|
|
else
|
|
{
|
|
itr++;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Evict when subs count hit zero, not when sub count started off at zero
|
|
if (bOverload)
|
|
{
|
|
bShouldRemove = false;
|
|
}
|
|
|
|
// Notify global subscribers, allowing them to preempt removal
|
|
if (bShouldRemove || bOverload)
|
|
{
|
|
AU_LOCK_GUARD(queue->sourceMutex_);
|
|
for (const auto &handler : queue->globalSubcribers_)
|
|
{
|
|
try
|
|
{
|
|
bShouldRemove &= handler->OnFinished(this->sourceBase);
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
}
|
|
}
|
|
|
|
return AuMakePair(true, bShouldRemove);
|
|
}
|
|
|
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
// Loop Queue
|
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
LoopQueue::LoopQueue()
|
|
{
|
|
this->hEvent_ = ::CreateEventW(NULL, true, false, NULL);
|
|
this->hDummy_ = ::CreateEventW(NULL, true, true, NULL);
|
|
}
|
|
|
|
LoopQueue::~LoopQueue()
|
|
{
|
|
AuWin32CloseHandle(this->hEvent_);
|
|
AuWin32CloseHandle(this->hDummy_);
|
|
}
|
|
|
|
bool LoopQueue::IsValid()
|
|
{
|
|
return this->hEvent_ != INVALID_HANDLE_VALUE && this->handleArrayOr_.size() && this->handleArrayAnd_.size();
|
|
}
|
|
|
|
struct LoopJoinable
|
|
{
|
|
LoopQueue *that;
|
|
void Lock()
|
|
{
|
|
that->Sync();
|
|
}
|
|
|
|
void Unlock()
|
|
{
|
|
that->Unlock();
|
|
}
|
|
};
|
|
|
|
void LoopQueue::Sync()
|
|
{
|
|
if (this->hEvent_ == INVALID_HANDLE_VALUE)
|
|
{
|
|
this->rwMutex_->AsWritable()->Lock();
|
|
return;
|
|
}
|
|
|
|
::SetEvent(this->hEvent_);
|
|
|
|
this->rwMutex_->AsWritable()->Lock();
|
|
|
|
::ResetEvent(this->hEvent_);
|
|
}
|
|
|
|
void LoopQueue::Unlock()
|
|
{
|
|
this->rwMutex_->AsWritable()->Unlock();
|
|
}
|
|
|
|
bool LoopQueue::SourceAdd(const AuSPtr<ILoopSource> &source)
|
|
{
|
|
AU_LOCK_GUARD(this->sourceMutex_);
|
|
|
|
if (!AuTryInsert(this->addedSources_, AuMakeTuple(source, 0, SourceCallbacks {})))
|
|
{
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool LoopQueue::SourceAddWithTimeout(const AuSPtr<ILoopSource> &source, AuUInt32 maxTimeout)
|
|
{
|
|
AU_LOCK_GUARD(this->sourceMutex_);
|
|
|
|
if (!AuTryInsert(this->addedSources_, AuMakeTuple(source, AuTime::SteadyClockMS() + maxTimeout, SourceCallbacks {})))
|
|
{
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool LoopQueue::SourceRemove(const AuSPtr<ILoopSource> &source)
|
|
{
|
|
AU_LOCK_GUARD(this->sourceMutex_);
|
|
return AuTryInsert(this->removedSources_, source) || AuTryRemoveByTupleN<0>(this->addedSources_, source);
|
|
}
|
|
|
|
bool LoopQueue::RemoveSourceNB(const AuSPtr<ILoopSource> &source)
|
|
{
|
|
if (source->GetType() == ELoopSource::eSourceWin32)
|
|
{
|
|
this->msgSource_.reset();
|
|
this->msgCallbacks_ = {};
|
|
this->bIsWinLoop_ = false;
|
|
return {};
|
|
}
|
|
|
|
bool bRebuildFromAnd {};
|
|
|
|
Iterator queueIterator(this);
|
|
for (queueIterator.Start();
|
|
queueIterator.End() != queueIterator.itr; )
|
|
{
|
|
if (queueIterator.itr->sourceBase == source)
|
|
{
|
|
if (queueIterator.itr->source)
|
|
{
|
|
auto count = queueIterator.itr->source->Singular() ? 1 : queueIterator.itr->source->GetHandles().size();
|
|
|
|
if (this->handleArrayOr_.size() <= MAXIMUM_WAIT_OBJECTS)
|
|
{
|
|
AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr, count);
|
|
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count);
|
|
}
|
|
else
|
|
{
|
|
bRebuildFromAnd = true;
|
|
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count);
|
|
}
|
|
|
|
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
|
|
break;
|
|
}
|
|
else
|
|
{
|
|
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
|
|
break;
|
|
}
|
|
}
|
|
|
|
queueIterator.Next();
|
|
}
|
|
|
|
return bRebuildFromAnd;
|
|
}
|
|
|
|
bool LoopQueue::Commit()
|
|
{
|
|
if (this->isCommitableInFuture_)
|
|
{
|
|
this->willCommitInFuture_ = true;
|
|
return true;
|
|
}
|
|
|
|
LoopJoinable joinable {this};
|
|
AU_LOCK_GUARD(joinable);
|
|
|
|
return CommitLocked();
|
|
}
|
|
|
|
void LoopQueue::BuildFromAndArray()
|
|
{
|
|
this->handleArrayOr_.clear();
|
|
|
|
this->handleArrayOr_.reserve(this->handleArrayAnd_.size() + 16);
|
|
|
|
for (int i = 0; i < this->handleArrayAnd_.size(); i++)
|
|
{
|
|
if (((this->handleArrayOr_.size() % MAXIMUM_WAIT_OBJECTS)) == 0)
|
|
{
|
|
this->handleArrayOr_.push_back(this->hEvent_);
|
|
}
|
|
|
|
this->handleArrayOr_.push_back(this->handleArrayAnd_[i]);
|
|
}
|
|
}
|
|
|
|
bool LoopQueue::CommitLocked()
|
|
{
|
|
try
|
|
{
|
|
bool bShouldRebuild {};
|
|
|
|
for (const auto &re : this->removedSources_)
|
|
{
|
|
bShouldRebuild |= this->RemoveSourceNB(re);
|
|
}
|
|
|
|
this->removedSources_.clear();
|
|
|
|
// Reserve the cache arrays for initialization
|
|
this->loopSourceExs_.reserve(this->loopSourceExs_.size() + this->addedSources_.size());
|
|
this->handleArrayAnd_.reserve(this->loopSourceExs_.size() + this->addedSources_.size());
|
|
this->handleArrayOr_.reserve(this->loopSourceExs_.size() + this->addedSources_.size());
|
|
|
|
bShouldRebuild |= bool(this->addedSources_.size());
|
|
|
|
//
|
|
for (auto &[source, timeout, callbacks] : this->addedSources_)
|
|
{
|
|
// Filter bad sources
|
|
if (!source)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
// Win32 edge case
|
|
if (source->GetType() == ELoopSource::eSourceWin32)
|
|
{
|
|
this->bIsWinLoop_ = true;
|
|
this->msgSource_ = source;
|
|
this->msgCallbacks_ = AuMove(callbacks);
|
|
continue;
|
|
}
|
|
|
|
// Handle known ILoopSourceEx handle objects
|
|
if (auto extended = AuDynamicCast<ILoopSourceEx>(source))
|
|
{
|
|
ExtendedSourceInfo t {extended};
|
|
t.timeoutAbs = timeout;
|
|
t.sourceBase = source;
|
|
t.source = extended;
|
|
t.callbacks = AuMove(callbacks);
|
|
this->loopSourceExs_.push_back(t);
|
|
|
|
if (extended->Singular())
|
|
{
|
|
const auto handle = extended->GetHandle();
|
|
auto nthandle = reinterpret_cast<HANDLE>(handle);
|
|
this->handleArrayAnd_.push_back(nthandle);
|
|
}
|
|
else
|
|
{
|
|
for (const auto &handle : extended->GetHandles())
|
|
{
|
|
auto nthandle = reinterpret_cast<HANDLE>(handle);
|
|
this->handleArrayAnd_.push_back(nthandle);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
AuTryClear(this->addedSources_);
|
|
|
|
if (bShouldRebuild)
|
|
{
|
|
BuildFromAndArray();
|
|
}
|
|
|
|
return true;
|
|
}
|
|
catch (...)
|
|
{
|
|
return {};
|
|
}
|
|
}
|
|
|
|
AuUInt32 LoopQueue::GetSourceCount()
|
|
{
|
|
return this->loopSourceExs_.size();
|
|
}
|
|
|
|
void LoopQueue::ChugPathConfigure(AuUInt32 sectionTickTime, AuSInt sectionDequeCount)
|
|
{
|
|
this->slowTickRate_ = AuMin(sectionDequeCount, AuSInt(MAXIMUM_WAIT_OBJECTS));
|
|
this->slowTickMs_ = sectionTickTime ? sectionTickTime : 1;
|
|
}
|
|
|
|
void LoopQueue::ChugHint(bool value)
|
|
{
|
|
this->forceChug_ = value;
|
|
}
|
|
|
|
bool LoopQueue::AddCallback(const AuSPtr<ILoopSource> &source, const AuSPtr<ILoopSourceSubscriber> &subscriber)
|
|
{
|
|
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
|
|
|
|
if (!source)
|
|
{
|
|
SysPushErrorArg();
|
|
return {};
|
|
}
|
|
|
|
if (!subscriber)
|
|
{
|
|
SysPushErrorArg();
|
|
return {};
|
|
}
|
|
|
|
|
|
AuUInt32 count {};
|
|
for (auto &sourceEx : this->loopSourceExs_)
|
|
{
|
|
if (sourceEx.sourceBase != source)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
AU_LOCK_GUARD(sourceEx.lock);
|
|
if (!AuTryInsert(sourceEx.callbacks.base, subscriber))
|
|
{
|
|
return {};
|
|
}
|
|
|
|
count++;
|
|
}
|
|
|
|
if (!count)
|
|
{
|
|
AU_LOCK_GUARD(this->sourceMutex_);
|
|
for (auto &[innerSource, timeout, callbacks] : this->addedSources_)
|
|
{
|
|
if (innerSource == source)
|
|
{
|
|
if (!AuTryInsert(callbacks.base, subscriber))
|
|
{
|
|
return {};
|
|
}
|
|
count++;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (source->GetType() == ELoopSource::eSourceWin32)
|
|
{
|
|
AU_LOCK_GUARD(this->sourceMutex_);
|
|
return AuTryInsert(this->msgCallbacks_.base, subscriber);
|
|
}
|
|
|
|
return count;
|
|
}
|
|
|
|
bool LoopQueue::AddCallbackEx(const AuSPtr<ILoopSource> &source, const AuSPtr<ILoopSourceSubscriberEx> &subscriber)
|
|
{
|
|
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
|
|
|
|
if (!source)
|
|
{
|
|
SysPushErrorArg();
|
|
return {};
|
|
}
|
|
|
|
if (!subscriber)
|
|
{
|
|
SysPushErrorArg();
|
|
return {};
|
|
}
|
|
|
|
|
|
AuUInt32 count {};
|
|
for (auto &sourceEx : this->loopSourceExs_)
|
|
{
|
|
if (sourceEx.sourceBase != source)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
// IMPORTANT: Soft-locks on this line, whose stack traces contain ExtendedSourceInfo::DoWork, are
|
|
// a re-add during io-tick before removal condition.
|
|
AU_LOCK_GUARD(sourceEx.lock);
|
|
if (!AuTryInsert(sourceEx.callbacks.extended, subscriber))
|
|
{
|
|
return {};
|
|
}
|
|
|
|
count++;
|
|
}
|
|
|
|
if (!count)
|
|
{
|
|
AU_LOCK_GUARD(this->sourceMutex_);
|
|
for (auto &[innerSource, timeout, callbacks] : this->addedSources_)
|
|
{
|
|
if (innerSource == source)
|
|
{
|
|
if (!AuTryInsert(callbacks.extended, subscriber))
|
|
{
|
|
return {};
|
|
}
|
|
count++;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (source->GetType() == ELoopSource::eSourceWin32)
|
|
{
|
|
AU_LOCK_GUARD(this->sourceMutex_);
|
|
return AuTryInsert(this->msgCallbacks_.extended, subscriber);
|
|
}
|
|
return count;
|
|
}
|
|
|
|
bool LoopQueue::AddCallback(const AuSPtr<ILoopSourceSubscriber> &subscriber)
|
|
{
|
|
AU_LOCK_GUARD(rwMutex_->AsReadable());
|
|
|
|
if (!subscriber)
|
|
{
|
|
SysPushErrorArg();
|
|
return {};
|
|
}
|
|
|
|
|
|
AU_LOCK_GUARD(this->sourceMutex_);
|
|
return AuTryInsert(this->globalSubcribers_, subscriber);
|
|
}
|
|
|
|
bool LoopQueue::IsSignaledPeek()
|
|
{
|
|
AuUInt32 askers {};
|
|
return WaitAnyNBSpurious(0, askers, nullptr, true);
|
|
}
|
|
|
|
bool LoopQueue::WaitAll(AuUInt32 timeout)
|
|
{
|
|
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
|
|
|
|
bool bReturnStatus {false};
|
|
bool bTimeout {false};
|
|
AuUInt32 count {};
|
|
AuUInt32 index {};
|
|
|
|
if (this->handleArrayAnd_.empty())
|
|
{
|
|
return true;
|
|
}
|
|
|
|
count = this->handleArrayAnd_.size();
|
|
|
|
AuUInt64 startTime = AuTime::HighResClockMS();
|
|
AuUInt64 endTime = startTime + timeout;
|
|
|
|
for (const auto &source : this->loopSourceExs_)
|
|
{
|
|
source.source->OnPresleep();
|
|
}
|
|
|
|
while (count != index)
|
|
{
|
|
auto next = AuMin(count - index, AuUInt32(MAXIMUM_WAIT_OBJECTS));
|
|
|
|
startTime = AuTime::HighResClockMS();
|
|
|
|
if (timeout)
|
|
{
|
|
if (endTime <= startTime)
|
|
{
|
|
StartUserAndTakeOwn();
|
|
ConsiderEvicitingTimeoutsAll();
|
|
StartUserAndTakeOwn_Release();
|
|
return false;
|
|
}
|
|
}
|
|
|
|
auto timeDelta = endTime - startTime; // TODO: cap to last obj
|
|
|
|
DWORD status {};
|
|
|
|
do
|
|
{
|
|
if (this->bIsWinLoop_)
|
|
{
|
|
status = ::MsgWaitForMultipleObjectsEx(next, this->handleArrayAnd_.data() + index, timeDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE | MWMO_WAITALL);
|
|
}
|
|
else
|
|
{
|
|
status = ::WaitForMultipleObjectsEx(next, this->handleArrayAnd_.data() + index, true, timeDelta, true);
|
|
}
|
|
} while (status == WAIT_IO_COMPLETION);
|
|
|
|
if (status == WAIT_OBJECT_0 + next)
|
|
{
|
|
TryPumpWin32();
|
|
continue;
|
|
}
|
|
|
|
if (WaitToRetStatus(status))
|
|
{
|
|
index += (status + 1);
|
|
continue;
|
|
}
|
|
|
|
// Account for win32 failures
|
|
if (status == WAIT_FAILED)
|
|
{
|
|
bReturnStatus = false;
|
|
break;
|
|
}
|
|
|
|
// Ingore the other unrelated errors (APC notification, timeout, etc)
|
|
}
|
|
|
|
// OK - All signals are set
|
|
|
|
// Take ownership of the queue ready for a potential purge of objects
|
|
StartUserAndTakeOwn();
|
|
|
|
// Le great iterate
|
|
Iterator queueIterator(this);
|
|
AuSInt indexOffset {};
|
|
auto now = AuTime::SteadyClockMS();
|
|
for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; )
|
|
{
|
|
auto &source = *queueIterator.itr;
|
|
|
|
auto [ticked, bShouldRemove] = source.DoWork(this, source.source->Singular() ? source.source->GetHandle() : source.source->GetHandles()[0]);
|
|
|
|
if (!bShouldRemove && source.ConsiderTimeout(now))
|
|
{
|
|
bShouldRemove = true;
|
|
}
|
|
|
|
source.source->OnFinishSleep();
|
|
|
|
if (ticked)
|
|
{
|
|
bReturnStatus |= true;
|
|
}
|
|
|
|
if (this->handleArrayOr_[queueIterator.startingIndexOr] == AuReinterpretCast<HANDLE>(this->hDummy_))
|
|
{
|
|
auto handle = AuReinterpretCast<HANDLE>(source.source->Singular() ? source.source->GetHandle() : source.source->GetHandles()[0]);
|
|
this->handleArrayAnd_[queueIterator.startingIndexAnd] = handle;
|
|
this->handleArrayOr_[queueIterator.startingIndexOr] = handle;
|
|
}
|
|
|
|
if (bShouldRemove)
|
|
{
|
|
|
|
if (source.source->GetType() == ELoopSource::eSourceWin32)
|
|
{
|
|
SysPanic("?");
|
|
}
|
|
else
|
|
{
|
|
AuUInt uHandles = (source.source->Singular() ? 1 : source.source->GetHandles().size());
|
|
int sOffset = -((int)uHandles);
|
|
if (this->handleArrayOr_.size() > MAXIMUM_WAIT_OBJECTS)
|
|
{
|
|
this->RemoveSourceNB(source.source);
|
|
this->willCommitInFuture_ = true;
|
|
}
|
|
else
|
|
{
|
|
AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr + indexOffset, uHandles);
|
|
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd + indexOffset, uHandles);
|
|
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
//else
|
|
{
|
|
queueIterator.Next();
|
|
}
|
|
}
|
|
|
|
StartUserAndTakeOwn_Release();
|
|
|
|
return bReturnStatus;
|
|
}
|
|
|
|
AuUInt32 LoopQueue::WaitAny(AuUInt32 timeout)
|
|
{
|
|
AuUInt32 ret {};
|
|
bool lastItr {};
|
|
|
|
AuUInt64 startTime = AuTime::HighResClockMS();
|
|
AuUInt64 endTime = timeout ? (startTime + timeout) : AuUInt64(-1);
|
|
AuUInt32 chuggerIndex {};
|
|
|
|
do
|
|
{
|
|
if (WaitAnyNBSpurious(endTime, chuggerIndex, nullptr, false))
|
|
{
|
|
ret++;
|
|
}
|
|
|
|
PumpHooks();
|
|
}
|
|
while (WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0);
|
|
|
|
return ret;
|
|
}
|
|
|
|
AuList<AuSPtr<ILoopSource>> LoopQueue::WaitAnyEx(AuUInt32 timeout)
|
|
{
|
|
AuList<AuSPtr<ILoopSource>> trigger;
|
|
|
|
AuUInt64 startTime = AuTime::HighResClockMS();
|
|
AuUInt64 endTime = timeout ? (startTime + timeout) : AuUInt64(-1);
|
|
AuUInt32 chuggerIndex {};
|
|
|
|
do
|
|
{
|
|
if (WaitAnyNBSpurious(endTime, chuggerIndex, &trigger, false))
|
|
{
|
|
return trigger;
|
|
}
|
|
|
|
PumpHooks();
|
|
}
|
|
while (WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0);
|
|
|
|
return trigger;
|
|
}
|
|
|
|
AuUInt32 LoopQueue::PumpNonblocking()
|
|
{
|
|
AuUInt32 ret {};
|
|
bool lastItr {};
|
|
|
|
AuUInt32 chuggerIndex {};
|
|
|
|
do
|
|
{
|
|
if (WaitAnyNBSpurious(0, chuggerIndex, nullptr, false))
|
|
{
|
|
ret++;
|
|
}
|
|
|
|
PumpHooks();
|
|
}
|
|
while (WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0);
|
|
|
|
return ret;
|
|
}
|
|
|
|
AuList<AuSPtr<ILoopSource>> LoopQueue::PumpNonblockingEx()
|
|
{
|
|
AuList<AuSPtr<ILoopSource>> trigger;
|
|
|
|
AuUInt32 chuggerIndex {};
|
|
|
|
do
|
|
{
|
|
if (WaitAnyNBSpurious(0, chuggerIndex, &trigger, false))
|
|
{
|
|
return trigger;
|
|
}
|
|
PumpHooks();
|
|
}
|
|
while (WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0);
|
|
|
|
return trigger;
|
|
}
|
|
|
|
bool LoopQueue::ChugWaitAny(AuUInt64 internalEndTime, AuUInt32 &chuggerIndex, AuUInt32 &indexOfTriggered)
|
|
{
|
|
AuUInt32 count {};
|
|
AuUInt32 &index {chuggerIndex};
|
|
|
|
count = this->handleArrayOr_.size();
|
|
indexOfTriggered = -1;
|
|
|
|
if (index > count) index = 0;
|
|
bool active = this->hEvent_ == INVALID_HANDLE_VALUE;
|
|
|
|
while (count != index)
|
|
{
|
|
auto next = AuMin(count - index, AuUInt32(MAXIMUM_WAIT_OBJECTS));
|
|
|
|
auto sleepMS = this->slowTickMs_;
|
|
if (internalEndTime && internalEndTime != AuUInt64(-1))
|
|
{
|
|
auto now = AuTime::HighResClockMS();
|
|
auto delta = AuInt64(internalEndTime) - AuInt64(now);
|
|
|
|
if (delta <= 0)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
sleepMS = AuMin(sleepMS, AuUInt32(delta));
|
|
}
|
|
|
|
DWORD status {};
|
|
if (this->bIsWinLoop_)
|
|
{
|
|
do
|
|
{
|
|
status = ::MsgWaitForMultipleObjectsEx(next, this->handleArrayOr_.data() + index, sleepMS, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE);
|
|
}
|
|
while (status == WAIT_IO_COMPLETION);
|
|
|
|
if (status == next)
|
|
{
|
|
indexOfTriggered = this->handleArrayOr_.size();
|
|
return true;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
do
|
|
{
|
|
status = ::WaitForMultipleObjectsEx(next, this->handleArrayOr_.data() + index, false, sleepMS, true);
|
|
}
|
|
while (status == WAIT_IO_COMPLETION);
|
|
}
|
|
|
|
if (WaitStatusFromAligned(status, active))
|
|
{
|
|
DWORD offset = status - WAIT_OBJECT_0;
|
|
indexOfTriggered = offset + index;
|
|
return true;
|
|
}
|
|
|
|
if (status == WAIT_OBJECT_0 && !active)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
if (status == WAIT_FAILED)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
index += count;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
bool LoopQueue::WaitAnyNBSpurious(AuUInt64 internalEndTime, AuUInt32 &chuggerIndex, AuList<AuSPtr<ILoopSource>> *trigger, bool poll)
|
|
{
|
|
bool status {};
|
|
DWORD temp;
|
|
AuUInt32 indexOfTriggered {};
|
|
AuUInt32 triggeredCount {};
|
|
|
|
AU_LOCK_GUARD(this->rwMutex_->AsReadable()); // the spurious wake up comes from an event that tells all to release me
|
|
|
|
if (trigger)
|
|
{
|
|
trigger->reserve(this->loopSourceExs_.size());
|
|
}
|
|
|
|
for (const auto &source : this->loopSourceExs_)
|
|
{
|
|
source.source->OnPresleep();
|
|
}
|
|
|
|
if ((this->handleArrayOr_.size() > MAXIMUM_WAIT_OBJECTS) || (this->forceChug_))
|
|
{
|
|
status = ChugWaitAny(internalEndTime, chuggerIndex, indexOfTriggered);
|
|
}
|
|
else
|
|
{
|
|
AuUInt32 sleepDelta {};
|
|
|
|
if (internalEndTime)
|
|
{
|
|
if (internalEndTime == AuUInt64(-1))
|
|
{
|
|
sleepDelta = INFINITE;
|
|
}
|
|
else
|
|
{
|
|
auto now = AuTime::HighResClockMS();
|
|
if (internalEndTime <= now)
|
|
{
|
|
return false;
|
|
}
|
|
sleepDelta = internalEndTime - now;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
sleepDelta = 0;
|
|
}
|
|
|
|
do
|
|
{
|
|
if (this->bIsWinLoop_)
|
|
{
|
|
temp = ::MsgWaitForMultipleObjectsEx(this->handleArrayOr_.size(), this->handleArrayOr_.data(), sleepDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE);
|
|
}
|
|
else
|
|
{
|
|
temp = ::WaitForMultipleObjectsEx(this->handleArrayOr_.size(), this->handleArrayOr_.data(), false, sleepDelta, true);
|
|
}
|
|
} while (temp == WAIT_IO_COMPLETION);
|
|
|
|
status = WaitToRetStatus(temp);
|
|
|
|
if (status)
|
|
{
|
|
indexOfTriggered = temp - WAIT_OBJECT_0;
|
|
}
|
|
}
|
|
|
|
if (indexOfTriggered == this->handleArrayOr_.size())
|
|
{
|
|
TryPumpWin32();
|
|
triggeredCount++;
|
|
}
|
|
else
|
|
{
|
|
if (TryPumpWin32())
|
|
{
|
|
triggeredCount++;
|
|
}
|
|
}
|
|
|
|
bool isPump = this->handleArrayOr_.size() == indexOfTriggered;
|
|
|
|
StartUserAndTakeOwn();
|
|
|
|
|
|
AuUInt firstTriggered {};
|
|
if (status)
|
|
{
|
|
if (!isPump)
|
|
{
|
|
firstTriggered = reinterpret_cast<AuUInt>(this->handleArrayOr_[indexOfTriggered]);
|
|
}
|
|
|
|
{
|
|
Iterator queueIterator(this);
|
|
AuSInt indexOffset {};
|
|
auto now = AuTime::HighResClockMS();
|
|
for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; )
|
|
{
|
|
bool shouldRemove {false};
|
|
auto &source = *queueIterator.itr;
|
|
|
|
AuUInt lastHandle {};
|
|
bool wasTriggered {};
|
|
|
|
if (source.source)
|
|
{
|
|
bool singular = source.source->Singular();
|
|
bool bPollTriggered = this->handleArrayOr_[queueIterator.startingIndexOr] == this->hDummy_;
|
|
|
|
if (!bPollTriggered)
|
|
{
|
|
if (!singular)
|
|
{
|
|
for (const auto &handle : source.source->GetHandles())
|
|
{
|
|
if ((firstTriggered == handle) ||
|
|
(::WaitForSingleObject(reinterpret_cast<HANDLE>(handle), 0) == WAIT_OBJECT_0))
|
|
{
|
|
lastHandle = handle;
|
|
wasTriggered = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
auto handle = source.source->GetHandle();
|
|
if ((firstTriggered == handle) ||
|
|
(::WaitForSingleObject(reinterpret_cast<HANDLE>(handle), 0) == WAIT_OBJECT_0))
|
|
{
|
|
lastHandle = firstTriggered;
|
|
wasTriggered = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (bPollTriggered || (wasTriggered && source.source->OnTrigger(lastHandle)))
|
|
{
|
|
bool failing {};
|
|
|
|
if (poll)
|
|
{
|
|
if (!bPollTriggered)
|
|
{
|
|
failing = !AuTryInsert(this->handleArrayTainted_, AuMakeTuple(queueIterator.startingIndexAnd, 1, reinterpret_cast<HANDLE>(lastHandle)));
|
|
|
|
if (!failing)
|
|
{
|
|
triggeredCount++;
|
|
|
|
this->handleArrayAnd_[queueIterator.startingIndexAnd ] = reinterpret_cast<HANDLE>(this->hDummy_);
|
|
this->handleArrayOr_[queueIterator.startingIndexOr ] = reinterpret_cast<HANDLE>(this->hDummy_);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
triggeredCount++;
|
|
}
|
|
}
|
|
|
|
if (!poll)
|
|
{
|
|
if (this->handleArrayTainted_.size() &&
|
|
bPollTriggered)
|
|
{
|
|
AuTuple<int, int, HANDLE> ref;
|
|
|
|
bool matched = AuRemoveIf(this->handleArrayTainted_, [&](const AuTuple<int, int, HANDLE> &in) -> bool
|
|
{
|
|
bool bMatched = queueIterator.startingIndexAnd == AuGet<0>(in);
|
|
if (!bMatched)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
ref = in;
|
|
return true;
|
|
});
|
|
|
|
if (matched)
|
|
{
|
|
if (AuGet<1>(ref) == 1)
|
|
{
|
|
auto handle = AuGet<2>(ref);
|
|
this->handleArrayOr_[queueIterator.startingIndexOr] = handle;
|
|
this->handleArrayAnd_[queueIterator.startingIndexAnd] = handle;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!poll || failing)
|
|
{
|
|
triggeredCount++;
|
|
shouldRemove = true;
|
|
|
|
auto [ticked, bShouldRemove] = source.DoWork(this, -1);
|
|
shouldRemove = bShouldRemove;
|
|
|
|
if (trigger)
|
|
{
|
|
AuTryInsert(*trigger, source.source);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
source.source->OnFinishSleep();
|
|
|
|
if (shouldRemove)
|
|
{
|
|
if (source.source->GetType() == ELoopSource::eSourceWin32)
|
|
{
|
|
SysPanic("?");
|
|
}
|
|
else
|
|
{
|
|
AuUInt uHandles = (source.source->Singular() ? 1 : source.source->GetHandles().size());
|
|
int sOffset = -(uHandles);
|
|
if (this->handleArrayOr_.size() > MAXIMUM_WAIT_OBJECTS)
|
|
{
|
|
this->RemoveSourceNB(source.source);
|
|
this->willCommitInFuture_ = true;
|
|
}
|
|
else
|
|
{
|
|
AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr + indexOffset, uHandles);
|
|
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd + indexOffset, uHandles);
|
|
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
//else
|
|
{
|
|
queueIterator.Next();
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
else
|
|
{
|
|
ConsiderEvicitingTimeoutsAll();
|
|
}
|
|
|
|
StartUserAndTakeOwn_Release();
|
|
|
|
|
|
return triggeredCount;
|
|
}
|
|
|
|
void LoopQueue::StartUserAndTakeOwn()
|
|
{
|
|
this->rwMutex_->UpgradeReadToWrite(0);
|
|
|
|
this->isCommitableInFuture_ = true;
|
|
}
|
|
|
|
|
|
bool LoopQueue::TryPumpWin32()
|
|
{
|
|
bool bMsgPump {};
|
|
|
|
if (!this->msgSource_)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
if (AuStaticCast<Win32Dummy>(this->msgSource_)->bIsPumping_)
|
|
{
|
|
MSG msg;
|
|
|
|
try
|
|
{
|
|
while (PeekMessageW(&msg, NULL, 0, 0, PM_REMOVE))
|
|
{
|
|
TranslateMessage(&msg);
|
|
DispatchMessageW(&msg);
|
|
bMsgPump = true;
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch("Win32 Pump <-> Aur LoopQueue. Window handler threw a C++ exception.");
|
|
}
|
|
}
|
|
else
|
|
{
|
|
MSG msg;
|
|
if (PeekMessageW(&msg, NULL, 0, 0, PM_NOREMOVE))
|
|
{
|
|
bMsgPump = true;
|
|
}
|
|
}
|
|
|
|
|
|
// Notify all and remove if unwanted
|
|
if (bMsgPump)
|
|
{
|
|
AU_LOCK_GUARD(this->sourceMutex_);
|
|
|
|
bool bShouldRemove = true;
|
|
for (const auto &handler : this->msgCallbacks_.base)
|
|
{
|
|
try
|
|
{
|
|
bShouldRemove &= handler->OnFinished(this->msgSource_);
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
}
|
|
|
|
AuUInt i {};
|
|
for (const auto &handler : this->msgCallbacks_.extended)
|
|
{
|
|
try
|
|
{
|
|
bShouldRemove &= handler->OnFinished(this->msgSource_, i++);
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
}
|
|
|
|
for (const auto &handler : this->globalSubcribers_)
|
|
{
|
|
try
|
|
{
|
|
bShouldRemove &= handler->OnFinished(this->msgSource_);
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
}
|
|
|
|
if (bShouldRemove)
|
|
{
|
|
this->bIsWinLoop_ = false;
|
|
}
|
|
}
|
|
|
|
return bMsgPump;
|
|
}
|
|
|
|
void LoopQueue::StartUserAndTakeOwn_Release()
|
|
{
|
|
this->isCommitableInFuture_ = false;
|
|
|
|
if (this->willCommitInFuture_)
|
|
{
|
|
this->CommitLocked();
|
|
}
|
|
|
|
this->rwMutex_->DowngradeWriteToRead();
|
|
}
|
|
|
|
// callee must own handle array
|
|
void LoopQueue::ConsiderEvicitingTimeoutsAll()
|
|
{
|
|
Iterator queueIterator(this);
|
|
bool bRebuildFromAnd {};
|
|
|
|
auto now = AuTime::HighResClockMS();
|
|
for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; )
|
|
{
|
|
|
|
auto &source = *queueIterator.itr;
|
|
if (!source.ConsiderTimeout(now))
|
|
{
|
|
queueIterator.Next();
|
|
source.source->OnFinishSleep();
|
|
continue;
|
|
}
|
|
|
|
source.source->OnFinishSleep();
|
|
|
|
if (source.source->GetType() == ELoopSource::eSourceWin32)
|
|
{
|
|
// Null message loop hack
|
|
this->msgSource_.reset();
|
|
this->msgCallbacks_ = {};
|
|
this->bIsWinLoop_ = false;
|
|
}
|
|
else
|
|
{
|
|
auto count = source.source->Singular() ? 1 : source.source->GetHandles().size();
|
|
|
|
if (this->handleArrayOr_.size() <= MAXIMUM_WAIT_OBJECTS)
|
|
{
|
|
AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr, count);
|
|
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count);
|
|
}
|
|
else
|
|
{
|
|
bRebuildFromAnd = true;
|
|
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count);
|
|
}
|
|
|
|
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
|
|
break;
|
|
}
|
|
|
|
queueIterator.Next();
|
|
}
|
|
|
|
if (bRebuildFromAnd)
|
|
{
|
|
BuildFromAndArray();
|
|
}
|
|
}
|
|
|
|
bool LoopQueue::AddHook(const AuFunction<void()> &func)
|
|
{
|
|
return AuTryInsert(this->epilogueHooks_, func);
|
|
}
|
|
|
|
void LoopQueue::PumpHooks()
|
|
{
|
|
auto c = AuExchange(this->epilogueHooks_, {});
|
|
for (auto &a : c)
|
|
{
|
|
a();
|
|
}
|
|
}
|
|
|
|
AUKN_SYM AuSPtr<ILoopQueue> NewLoopQueue()
|
|
{
|
|
return AuMakeShared<LoopQueue>();
|
|
}
|
|
} |