AuroraRuntime/Source/Loop/LoopQueue.NT.cpp

864 lines
24 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: LoopQueue.NT.cpp
Date: 2022-1-8
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Loop.NT.hpp"
#include "ILoopSourceEx.hpp"
#include "LSWin32.NT.hpp"
#include "LoopQueue.NT.hpp"
static bool WaitToRetStatus(DWORD ret)
{
return !((ret == WAIT_TIMEOUT) ||
(ret == WAIT_IO_COMPLETION) ||
(ret == WAIT_FAILED));
}
static bool WaitStatusFromAligned(DWORD ret, bool active)
{
return (WaitToRetStatus(ret) && ((active) || (ret != WAIT_OBJECT_0 /*lock releaser*/)));
}
namespace Aurora::Loop
{
LoopQueue::LoopQueue()
{
this->hEvent_ = CreateEventW(NULL, false, false, NULL);
}
LoopQueue::~LoopQueue()
{
AuWin32CloseHandle(this->hEvent_);
}
bool LoopQueue::IsValid()
{
return this->hEvent_ != INVALID_HANDLE_VALUE && this->handleArray_.size();
}
struct LoopJoinable
{
LoopQueue *that;
void Lock()
{
that->Sync();
}
void Unlock()
{
that->Unlock();
}
};
void LoopQueue::Sync()
{
if (this->hEvent_ != INVALID_HANDLE_VALUE)
{
this->rwMutex_->AsWritable()->Lock();
return;
}
SetEvent(this->hEvent_);
this->rwMutex_->AsWritable()->Lock();
ResetEvent(this->hEvent_);
}
void LoopQueue::Unlock()
{
this->rwMutex_->AsWritable()->Unlock();
}
bool LoopQueue::SourceAdd(const AuSPtr<ILoopSource> &source)
{
AU_LOCK_GUARD(this->sourceMutex_);
if (!AuTryInsert(this->sources_, AuMakeTuple(source, 0)))
{
return false;
}
return true;
}
bool LoopQueue::SourceAddWithTimeout(const AuSPtr<ILoopSource> &source, AuUInt32 maxTimeout)
{
AU_LOCK_GUARD(this->sourceMutex_);
if (!AuTryInsert(this->sources_, AuMakeTuple(source, AuTime::CurrentInternalClockMS() + maxTimeout)))
{
return false;
}
return true;
}
bool LoopQueue::SourceRemove(const AuSPtr<ILoopSource> &source)
{
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->removedSources_, source);
}
void LoopQueue::RemoveSourceNB(const AuSPtr<ILoopSource> &source, bool removeHandles)
{
AU_LOCK_GUARD(this->sourceMutex_); // me
AuTryRemoveByTupleN<0>(this->sources_, source); // guarded by ^
AuRemoveIf(this->loopSourceExs_, [source](const auto &re)
{
return re.sourceBase == source;
}); // guarded by the loop joinable
if (source->GetType() == ELoopSource::eSourceWin32)
{
this->msgSource_.reset();
this->msgCallbacks_ = {};
this->bIsWinLoop_ = false;
return;
}
if (!removeHandles)
{
return;
}
AuUInt handleIndex {};
for (const auto &sourceEx : this->loopSourceExs_)
{
AuUInt lastHandle {};
bool wasTriggered {};
auto handles = sourceEx.source->GetHandles();
auto lsStartIndex = handleIndex;
auto delta = handles.size();
if ((handleIndex % 64) == 0)
{
handleIndex++;
}
handleIndex += delta;
if (sourceEx.source == source)
{
AuRemoveRange(this->handleArray_, lsStartIndex, delta);
}
}
}
bool LoopQueue::Commit()
{
LoopJoinable joinable {this};
AU_LOCK_GUARD(joinable);
try
{
for (const auto &re : this->removedSources_)
{
RemoveSourceNB(re, this->sources_.empty());
removedSources_.clear();
}
if (this->sources_.empty() && this->removedSources_.size())
{
return true;
}
// Reset relevant OS cache
this->bIsWinLoop_ = false;
this->bIsThreadSafe_ = false;
AuTryClear(this->loopSourceExs_);
AuTryClear(this->handleArray_);
// Reserve the cache arrays for initialization
this->loopSourceExs_.reserve(this->sources_.size());
this->handleArray_.reserve(this->sources_.size());
//
for (const auto &[source, timeout] : sources_)
{
// Filter bad sources
if (!source)
{
continue;
}
// Win32 edge case
if (source->GetType() == ELoopSource::eSourceWin32)
{
this->bIsWinLoop_ = true;
this->msgSource_ = source;
continue;
}
// Handle known ILoopSourceEx handle objects
if (auto extended = AuDynamicCast<ILoopSourceEx>(source))
{
ExtendeSourceInfo t {extended};
t.timeoutAbs = timeout;
t.sourceBase = source;
t.source = extended;
this->loopSourceExs_.push_back(t);
if ((this->handleArray_.size() % MAXIMUM_WAIT_OBJECTS) == 0)
{
if (this->hEvent_ != INVALID_HANDLE_VALUE)
{
this->bIsThreadSafe_ = true;
this->handleArray_.push_back(hEvent_);
}
}
for (const auto &handle : extended->GetHandles())
{
auto nthandle = reinterpret_cast<HANDLE>(handle);
this->handleArray_.push_back(nthandle);
}
}
}
return true;
}
catch (...)
{
AuTryClear(this->loopSourceExs_);
AuTryClear(this->handleArray_);
return {};
}
}
AuUInt32 LoopQueue::GetSourceCount()
{
return this->loopSourceExs_.size();
}
void LoopQueue::ChugPathConfigure(AuUInt32 sectionTickTime, AuSInt sectionDequeCount)
{
this->slowTickRate_ = AuMin(sectionDequeCount, AuSInt(MAXIMUM_WAIT_OBJECTS));
this->slowTickMs_ = sectionTickTime ? sectionTickTime : 1;
}
void LoopQueue::ChugHint(bool value)
{
this->forceChug_ = value;
}
bool LoopQueue::AddCallback(const AuSPtr<ILoopSource> &source, const AuSPtr<ILoopSourceSubscriber> &subscriber)
{
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
if (!source)
{
SysPushErrorArg();
return {};
}
if (!subscriber)
{
SysPushErrorArg();
return {};
}
if (source->GetType() == ELoopSource::eSourceWin32)
{
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->msgCallbacks_, subscriber);
}
AuUInt32 count {};
for (auto &sourceEx : this->loopSourceExs_)
{
if (sourceEx.sourceBase != source)
{
continue;
}
AU_LOCK_GUARD(sourceEx.lock);
if (!AuTryInsert(sourceEx.specificSubscribers, subscriber))
{
return {};
}
count++;
}
return count;
}
bool LoopQueue::AddCallbackEx(const AuSPtr<ILoopSource> &source, const AuSPtr<ILoopSourceSubscriberEx> &subscriber)
{
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
if (!source)
{
SysPushErrorArg();
return {};
}
if (!subscriber)
{
SysPushErrorArg();
return {};
}
if (source->GetType() == ELoopSource::eSourceWin32)
{
return false;
}
AuUInt32 count {};
for (auto &sourceEx : this->loopSourceExs_)
{
if (sourceEx.sourceBase != source)
{
continue;
}
AU_LOCK_GUARD(sourceEx.lock);
if (!AuTryInsert(sourceEx.timeoutSubscribers, subscriber))
{
return {};
}
count++;
}
return count;
}
bool LoopQueue::AddCallback(const AuSPtr<ILoopSourceSubscriber> &subscriber)
{
AU_LOCK_GUARD(rwMutex_->AsReadable());
if (!subscriber)
{
SysPushErrorArg();
return {};
}
// yes, this seems insane; however, the spinlock is basically free.
// disturbing the rwMutex_ would mean spuriously waking up all the Wait[...]Nb functions
// unlikely, but whats even less likely is for each sources spinlock to be held.
// this should be a work init-bound function, not wait or commit, so the vector abuse doesnt matter so much
for (auto &sourceEx : this->loopSourceExs_)
{
AU_LOCK_GUARD(sourceEx.lock); // hack lock, but its a spin zoomer
if (!AuTryInsert(sourceEx.globalSubscribers, subscriber))
{
return {};
}
}
{
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->msgCallbacks_, subscriber);
}
return true;
}
bool LoopQueue::HasFinished()
{
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
AuUInt32 askers {};
return WaitAnyNBSpurious(INFINITE, askers, nullptr, true) == this->sources_.size();
}
bool LoopQueue::IsSignaled()
{
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
AuUInt32 askers {};
return WaitAnyNBSpurious(0, askers, nullptr, true);
}
bool LoopQueue::WaitAll(AuUInt32 timeout)
{
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
bool bReturnStatus {true};
bool bTimeout {false};
AuUInt32 count {};
AuUInt32 index {};
if (this->handleArray_.empty())
{
return true;
}
count = this->handleArray_.size();
AuUInt32 startTime = AuTime::CurrentInternalClockMS();
AuUInt32 endTime = startTime + timeout;
bool active = this->hEvent_ == INVALID_HANDLE_VALUE;
while (count != index)
{
auto next = AuMin(count - index, AuUInt32(MAXIMUM_WAIT_OBJECTS));
startTime = AuTime::CurrentInternalClockMS();
if (endTime <= startTime)
{
ConsiderEvicitingTimeoutsAll();
return false;
}
auto timeDelta = endTime - startTime; // TODO: cap to last obj
DWORD status {};
if (this->bIsWinLoop_)
{
status = ::MsgWaitForMultipleObjectsEx(next, this->handleArray_.data() + index, timeDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE | MWMO_WAITALL);
}
else
{
status = ::WaitForMultipleObjectsEx(next, this->handleArray_.data() + index, true, timeDelta, true);
}
if (WaitStatusFromAligned(status, active))
{
index += count;
// continue the sleep after succesfully passing MAXIMUM_WAIT_OBJECTS or [..., EOS] objects
continue;
}
// Account for updates
if (status == WAIT_OBJECT_0 && !active)
{
count = this->handleArray_.size();
if (count == 0)
{
return true;
}
else if (index > count)
{
index = 0;
}
continue;
}
// Account for win32 failures
if (status == WAIT_FAILED)
{
bReturnStatus = false;
break;
}
// Ingore the other unrelated errors (APC notification, timeout, etc)
}
ConsiderEvicitingTimeoutsAll();
return bReturnStatus;
}
AuUInt32 LoopQueue::WaitAny(AuUInt32 timeout)
{
AuUInt32 ret {};
bool lastItr {};
AuUInt32 startTime = AuTime::CurrentInternalClockMS();
AuUInt32 endTime = timeout ? (startTime + timeout) : INFINITE;
AuUInt32 chuggerIndex {};
do
{
if (WaitAnyNBSpurious(endTime, chuggerIndex, nullptr, false))
{
ret++;
}
}
while (WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0);
return ret;
}
AuList<AuSPtr<ILoopSource>> LoopQueue::WaitAnyEx(AuUInt32 timeout)
{
AuList<AuSPtr<ILoopSource>> trigger;
AuUInt32 startTime = AuTime::CurrentInternalClockMS();
AuUInt32 endTime = timeout ? (startTime + timeout) : INFINITE;
AuUInt32 chuggerIndex {};
do
{
if (WaitAnyNBSpurious(endTime, chuggerIndex, &trigger, false))
{
return trigger;
}
}
while (WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0);
return trigger;
}
bool LoopQueue::ChugWaitAny(AuUInt32 timeout, AuUInt32 &chuggerIndex, AuUInt32 &indexOfTriggered)
{
AuUInt32 count {};
AuUInt32 &index {chuggerIndex};
count = this->handleArray_.size();
indexOfTriggered = -1;
if (index > count) index = 0;
bool active = this->hEvent_ == INVALID_HANDLE_VALUE;
while (count != index)
{
auto next = AuMin(count - index, AuUInt32(MAXIMUM_WAIT_OBJECTS));
if (timeout && timeout <= AuTime::CurrentInternalClockMS())
{
return false;
}
DWORD status {};
if (this->bIsWinLoop_)
{
status = ::MsgWaitForMultipleObjectsEx(next, this->handleArray_.data() + index, this->slowTickMs_, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE);
if (status == next)
{
return this->handleArray_.size();
}
}
else
{
status = ::WaitForMultipleObjectsEx(next, this->handleArray_.data() + index, false, this->slowTickMs_, true);
}
if (WaitStatusFromAligned(status, active))
{
DWORD offset = status - WAIT_OBJECT_0;
indexOfTriggered = offset + index;
return true;
}
if (status == WAIT_OBJECT_0 && !active)
{
return false;
}
if (status == WAIT_FAILED)
{
return false;
}
index += count;
}
return false;
}
bool LoopQueue::WaitAnyNBSpurious(AuUInt32 timeout, AuUInt32 &chuggerIndex, AuList<AuSPtr<ILoopSource>> *trigger, bool poll)
{
bool status {};
DWORD temp;
AuUInt32 indexOfTriggered {};
AuUInt32 triggeredCount {};
AU_LOCK_GUARD(this->rwMutex_->AsReadable()); // the spurious wake up comes from an event that tells all to release me
if (trigger)
{
trigger->reserve(this->sources_.size());
}
for (const auto &source : this->loopSourceExs_)
{
source.source->OnPresleep();
}
if ((this->handleArray_.size() > MAXIMUM_WAIT_OBJECTS) || (this->forceChug_))
{
status = ChugWaitAny(timeout, chuggerIndex, indexOfTriggered);
}
else
{
auto now = AuTime::CurrentInternalClockMS();
if (timeout && timeout <= now)
{
return false;
}
auto sleepDelta = timeout - now;
if (this->bIsWinLoop_)
{
temp = ::MsgWaitForMultipleObjectsEx(this->handleArray_.size(), this->handleArray_.data(), sleepDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE);
}
else
{
temp = ::WaitForMultipleObjectsEx(this->handleArray_.size(), this->handleArray_.data(), false, sleepDelta, true);
}
status = WaitToRetStatus(temp);
if (status)
{
indexOfTriggered = temp - WAIT_OBJECT_0;
}
}
bool isPump = this->handleArray_.size() == indexOfTriggered;
AuUInt firstTriggered {};
if (status)
{
if (!isPump)
{
firstTriggered = reinterpret_cast<AuUInt>(this->handleArray_[indexOfTriggered]);
}
AuUInt handleIndex {};
for (auto itr = this->loopSourceExs_.begin(); this->loopSourceExs_.end() != itr; )
{
auto &source = *itr;
AuUInt lastHandle {};
bool wasTriggered {};
auto handles = source.source->GetHandles();
if ((handleIndex % 64) == 0)
{
handleIndex++;
}
auto lsStartIndex = handleIndex;
for (const auto &handle : handles)
{
handleIndex++;
if ((firstTriggered == handle) ||
(::WaitForSingleObject(reinterpret_cast<HANDLE>(handle), 0) == WAIT_OBJECT_0))
{
lastHandle = handle;
wasTriggered = true;
break;
}
}
bool shouldRemove {false};
if (wasTriggered && source.source->OnTrigger(lastHandle))
{
triggeredCount++;
shouldRemove = true;
AU_LOCK_GUARD(const_cast<AuThreadPrimitives::SpinLock *>(&source.lock)) // this spinlock really does feel like a hack
for (const auto &handler : source.specificSubscribers)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
}
if (shouldRemove)
{
for (const auto &handler : source.timeoutSubscribers)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
if (shouldRemove)
{
for (const auto &handler : source.globalSubscribers)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
if ((source.specificSubscribers.empty()) &&
(source.globalSubscribers.empty()) &&
(source.timeoutSubscribers.empty()))
{
shouldRemove = false;
}
}
if (shouldRemove)
{
if (trigger)
{
AuTryInsert(*trigger, source.source);
}
}
if (source.ConsiderTimeout())
{
shouldRemove = true;
}
source.source->OnFinishSleep();
if (shouldRemove)
{
// Evict from OS cache
AuRemoveRange(this->handleArray_, lsStartIndex, handles.size());
{
AU_LOCK_GUARD(this->sourceMutex_);
AuTryRemoveByTupleN<0>(this->sources_, source.sourceBase);
}
if (source.source->GetType() == ELoopSource::eSourceWin32)
{
this->msgSource_.reset();
this->msgCallbacks_ = {};
this->bIsWinLoop_ = false;
}
itr = this->loopSourceExs_.erase(itr);
handleIndex -= handles.size();
}
else
{
itr++;
}
}
}
else
{
ConsiderEvicitingTimeoutsAll();
}
if (isPump)
{
triggeredCount++;
// Notify all and remove if unwanted
{
AU_LOCK_GUARD(this->sourceMutex_);
if (trigger)
{
AuTryInsert(*trigger, this->msgSource_);
}
bool shouldRemove {true};
for (const auto &handler : this->msgCallbacks_)
{
try
{
shouldRemove &= handler->OnFinished(this->msgSource_);
}
catch (...)
{
SysPushErrorCatch();
}
}
if (shouldRemove)
{
this->bIsWinLoop_ = false;
}
}
// Do the work of pumping the Win32 event queue if the user didn't flush the queue during the OnFinished callback
{
if (AuStaticCast<Win32Dummy>(this->msgSource_)->bIsPumping_)
{
MSG msg;
try
{
while (PeekMessageW(&msg, NULL, 0, 0, PM_REMOVE))
{
TranslateMessage(&msg);
DispatchMessageW(&msg);
}
}
catch (...)
{
SysPushErrorCatch("Win32 Pump <-> Aur LoopQueue. Window handler threw a C++ exception.");
}
}
}
}
return triggeredCount;
}
void LoopQueue::ConsiderEvicitingTimeoutsAll()
{
AuUInt32 startIndex {};
for (auto itr = this->loopSourceExs_.begin(); this->loopSourceExs_.end() != itr; )
{
auto &source = *itr;
if ((startIndex % 64) == 0)
{
startIndex++;
}
auto count = source.source->GetHandles().size();
auto begin = startIndex;
startIndex += count;
if (!source.ConsiderTimeout())
{
itr++;
continue;
}
// Evict from OS cache
AuRemoveRange(this->handleArray_, startIndex, count);
{
AU_LOCK_GUARD(this->sourceMutex_);
AuTryRemoveByTupleN<0>(this->sources_, source.sourceBase);
}
if (source.source->GetType() == ELoopSource::eSourceWin32)
{
this->msgSource_.reset();
this->msgCallbacks_ = {};
this->bIsWinLoop_ = false;
}
itr = this->loopSourceExs_.erase(itr);
}
}
AUKN_SYM AuSPtr<ILoopQueue> NewLoopQueue()
{
return AuMakeShared<LoopQueue>();
}
}