AuroraRuntime/Source/Loop/LoopQueue.NT.cpp
Reece b0b9931586 [*] Update iconv
[*] nt queue
[-] Remove misplaced pragma once
2022-04-12 20:16:49 +01:00

1208 lines
37 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: LoopQueue.NT.cpp
Date: 2022-1-8
Author: Reece
Note: Win32 sewage lmao
***/
#include <Source/RuntimeInternal.hpp>
#include "Loop.NT.hpp"
#include "ILoopSourceEx.hpp"
#include "LSWin32.NT.hpp"
#include "LoopQueue.NT.hpp"
static bool WaitToRetStatus(DWORD ret)
{
return !((ret == WAIT_TIMEOUT) ||
(ret == WAIT_IO_COMPLETION) ||
(ret == WAIT_FAILED));
}
static bool WaitStatusFromAligned(DWORD ret, bool active)
{
return (WaitToRetStatus(ret) && ((active) || (ret != WAIT_OBJECT_0 /*lock releaser*/)));
}
namespace Aurora::Loop
{
LoopQueue::LoopQueue()
{
this->hEvent_ = CreateEventW(NULL, true, false, NULL);
this->hDummy_ = CreateEventW(NULL, true, true, NULL);
}
LoopQueue::~LoopQueue()
{
AuWin32CloseHandle(this->hEvent_);
AuWin32CloseHandle(this->hDummy_);
}
bool LoopQueue::IsValid()
{
return this->hEvent_ != INVALID_HANDLE_VALUE && this->handleArrayOr_.size() && this->handleArrayAnd_.size();
}
struct LoopJoinable
{
LoopQueue *that;
void Lock()
{
that->Sync();
}
void Unlock()
{
that->Unlock();
}
};
void LoopQueue::Sync()
{
if (this->hEvent_ == INVALID_HANDLE_VALUE)
{
this->rwMutex_->AsWritable()->Lock();
return;
}
SetEvent(this->hEvent_);
this->rwMutex_->AsWritable()->Lock();
ResetEvent(this->hEvent_);
}
void LoopQueue::Unlock()
{
this->rwMutex_->AsWritable()->Unlock();
}
bool LoopQueue::SourceAdd(const AuSPtr<ILoopSource> &source)
{
AU_LOCK_GUARD(this->sourceMutex_);
if (!AuTryInsert(this->addedSources_, AuMakeTuple(source, 0, SourceCallbacks {})))
{
return false;
}
return true;
}
bool LoopQueue::SourceAddWithTimeout(const AuSPtr<ILoopSource> &source, AuUInt32 maxTimeout)
{
AU_LOCK_GUARD(this->sourceMutex_);
if (!AuTryInsert(this->addedSources_, AuMakeTuple(source, AuTime::CurrentInternalClockMS() + maxTimeout, SourceCallbacks {})))
{
return false;
}
return true;
}
bool LoopQueue::SourceRemove(const AuSPtr<ILoopSource> &source)
{
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->removedSources_, source) || AuTryRemoveByTupleN<0>(this->addedSources_, source);
}
bool LoopQueue::RemoveSourceNB(const AuSPtr<ILoopSource> &source)
{
if (source->GetType() == ELoopSource::eSourceWin32)
{
this->msgSource_.reset();
this->msgCallbacks_ = {};
this->bIsWinLoop_ = false;
return {};
}
Iterator queueIterator(this);
bool bRebuildFromAnd {};
for ( queueIterator.Start(); queueIterator.End() != queueIterator.itr; )
{
if (queueIterator.itr->sourceBase == source)
{
if (queueIterator.itr->source)
{
auto count = queueIterator.itr->source->Singular() ? 1 : queueIterator.itr->source->GetHandles().size();
if (this->handleArrayOr_.size() <= MAXIMUM_WAIT_OBJECTS)
{
AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr, count);
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count);
}
else
{
bRebuildFromAnd = true;
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count);
}
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
break;
}
else
{
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
break;
}
}
queueIterator.Next();
}
return bRebuildFromAnd;
}
bool LoopQueue::Commit()
{
if (this->isCommitableInFuture_)
{
this->willCommitInFuture_ = true;
return true;
}
LoopJoinable joinable {this};
AU_LOCK_GUARD(joinable);
return CommitLocked();
}
void LoopQueue::BuildFromAndArray()
{
this->handleArrayOr_.clear();
this->handleArrayOr_.reserve(this->handleArrayAnd_.size() + 16);
for (int i = 0; i < this->handleArrayAnd_.size(); i++)
{
if (((this->handleArrayOr_.size() % MAXIMUM_WAIT_OBJECTS)) == 0)
{
this->handleArrayOr_.push_back(this->hEvent_);
}
this->handleArrayOr_.push_back(this->handleArrayAnd_[i]);
}
}
bool LoopQueue::CommitLocked()
{
try
{
bool bShouldRebuild {};
for (const auto &re : this->removedSources_)
{
bShouldRebuild |= this->RemoveSourceNB(re);
}
this->removedSources_.clear();
// Reserve the cache arrays for initialization
this->loopSourceExs_.reserve(this->loopSourceExs_.size() + this->addedSources_.size());
this->handleArrayAnd_.reserve(this->loopSourceExs_.size() + this->addedSources_.size());
this->handleArrayOr_.reserve(this->loopSourceExs_.size() + this->addedSources_.size());
bShouldRebuild |= this->addedSources_.size();
//
for (auto &[source, timeout, callbacks] : this->addedSources_)
{
// Filter bad sources
if (!source)
{
continue;
}
// Win32 edge case
if (source->GetType() == ELoopSource::eSourceWin32)
{
this->bIsWinLoop_ = true;
this->msgSource_ = source;
this->msgCallbacks_ = AuMove(callbacks);
continue;
}
// Handle known ILoopSourceEx handle objects
if (auto extended = AuDynamicCast<ILoopSourceEx>(source))
{
ExtendeSourceInfo t {extended};
t.timeoutAbs = timeout;
t.sourceBase = source;
t.source = extended;
t.callbacks = AuMove(callbacks);
this->loopSourceExs_.push_back(t);
if (extended->Singular())
{
const auto handle = extended->GetHandle();
auto nthandle = reinterpret_cast<HANDLE>(handle);
this->handleArrayAnd_.push_back(nthandle);
}
else
{
for (const auto &handle : extended->GetHandles())
{
auto nthandle = reinterpret_cast<HANDLE>(handle);
this->handleArrayAnd_.push_back(nthandle);
}
}
}
}
AuTryClear(this->addedSources_);
if (bShouldRebuild)
{
BuildFromAndArray();
}
return true;
}
catch (...)
{
return {};
}
}
AuUInt32 LoopQueue::GetSourceCount()
{
return this->loopSourceExs_.size();
}
void LoopQueue::ChugPathConfigure(AuUInt32 sectionTickTime, AuSInt sectionDequeCount)
{
this->slowTickRate_ = AuMin(sectionDequeCount, AuSInt(MAXIMUM_WAIT_OBJECTS));
this->slowTickMs_ = sectionTickTime ? sectionTickTime : 1;
}
void LoopQueue::ChugHint(bool value)
{
this->forceChug_ = value;
}
bool LoopQueue::AddCallback(const AuSPtr<ILoopSource> &source, const AuSPtr<ILoopSourceSubscriber> &subscriber)
{
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
if (!source)
{
SysPushErrorArg();
return {};
}
if (!subscriber)
{
SysPushErrorArg();
return {};
}
if (source->GetType() == ELoopSource::eSourceWin32)
{
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->msgCallbacks_.base, subscriber);
}
AuUInt32 count {};
for (auto &sourceEx : this->loopSourceExs_)
{
if (sourceEx.sourceBase != source)
{
continue;
}
AU_LOCK_GUARD(sourceEx.lock);
if (!AuTryInsert(sourceEx.callbacks.base, subscriber))
{
return {};
}
count++;
}
if (!count)
{
AU_LOCK_GUARD(this->sourceMutex_);
for (auto &[innerSource, timeout, callbacks] : this->addedSources_)
{
if (innerSource == source)
{
if (!AuTryInsert(callbacks.base, subscriber))
{
return {};
}
count++;
}
}
}
return count;
}
bool LoopQueue::AddCallbackEx(const AuSPtr<ILoopSource> &source, const AuSPtr<ILoopSourceSubscriberEx> &subscriber)
{
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
if (!source)
{
SysPushErrorArg();
return {};
}
if (!subscriber)
{
SysPushErrorArg();
return {};
}
if (source->GetType() == ELoopSource::eSourceWin32)
{
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->msgCallbacks_.extended, subscriber);
}
AuUInt32 count {};
for (auto &sourceEx : this->loopSourceExs_)
{
if (sourceEx.sourceBase != source)
{
continue;
}
AU_LOCK_GUARD(sourceEx.lock);
if (!AuTryInsert(sourceEx.callbacks.extended, subscriber))
{
return {};
}
count++;
}
if (!count)
{
AU_LOCK_GUARD(this->sourceMutex_);
for (auto &[innerSource, timeout, callbacks] : this->addedSources_)
{
if (innerSource == source)
{
if (!AuTryInsert(callbacks.extended, subscriber))
{
return {};
}
count++;
}
}
}
return count;
}
bool LoopQueue::AddCallback(const AuSPtr<ILoopSourceSubscriber> &subscriber)
{
AU_LOCK_GUARD(rwMutex_->AsReadable());
if (!subscriber)
{
SysPushErrorArg();
return {};
}
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->globalSubcribers_, subscriber);
}
bool LoopQueue::IsSignaledPeek()
{
AuUInt32 askers {};
return WaitAnyNBSpurious(0, askers, nullptr, true);
}
bool LoopQueue::WaitAll(AuUInt32 timeout)
{
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
bool bReturnStatus {true};
bool bTimeout {false};
AuUInt32 count {};
AuUInt32 index {};
if (this->handleArrayAnd_.empty())
{
return true;
}
count = this->handleArrayAnd_.size();
AuUInt64 startTime = AuTime::CurrentInternalClockMS();
AuUInt64 endTime = startTime + timeout;
for (const auto &source : this->loopSourceExs_)
{
source.source->OnPresleep();
}
while (count != index)
{
auto next = AuMin(count - index, AuUInt32(MAXIMUM_WAIT_OBJECTS));
startTime = AuTime::CurrentInternalClockMS();
if (endTime <= startTime)
{
StartUserAndTakeOwn();
ConsiderEvicitingTimeoutsAll();
StartUserAndTakeOwn_Release();
return false;
}
auto timeDelta = endTime - startTime; // TODO: cap to last obj
DWORD status {};
if (this->bIsWinLoop_)
{
status = ::MsgWaitForMultipleObjectsEx(next, this->handleArrayAnd_.data() + index, timeDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE | MWMO_WAITALL);
}
else
{
status = ::WaitForMultipleObjectsEx(next, this->handleArrayAnd_.data() + index, true, timeDelta, true);
}
if (status == WAIT_OBJECT_0 + next)
{
TryPumpWin32();
continue;
}
if (WaitToRetStatus(status))
{
index += (status + 1);
continue;
}
// Account for win32 failures
if (status == WAIT_FAILED)
{
bReturnStatus = false;
break;
}
// Ingore the other unrelated errors (APC notification, timeout, etc)
}
// OK - All signals are set
// Take ownership of the queue ready for a potential purge of objects
StartUserAndTakeOwn();
// Le great iterate
Iterator queueIterator(this);
AuSInt indexOffset {};
auto now = AuTime::CurrentInternalClockMS();
for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; )
{
bool shouldRemove {true};
auto &source = *queueIterator.itr;
if (source.source)
{
if (source.source->OnTrigger(source.source->Singular() ? source.source->GetHandle() : source.source->GetHandles()[0]))
{
AU_LOCK_GUARD(const_cast<AuThreadPrimitives::SpinLock *>(&source.lock)) // this spinlock really does feel like a hack
for (const auto &handler : source.callbacks.extended)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
}
if (shouldRemove)
{
for (const auto &handler : source.callbacks.base)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
if (shouldRemove)
{
AU_LOCK_GUARD(this->sourceMutex_); // TODO...
for (const auto &handler : this->globalSubcribers_)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
if ((source.callbacks.base.empty()) &&
(source.callbacks.extended.empty()))
{
shouldRemove = false;
}
}
}
if (!shouldRemove && source.ConsiderTimeout(now))
{
shouldRemove = true;
}
source.source->OnFinishSleep();
if (shouldRemove)
{
if (source.source->GetType() == ELoopSource::eSourceWin32)
{
SysPanic("?");
}
else
{
AuUInt uHandles = (source.source->Singular() ? 1 : source.source->GetHandles().size());
int sOffset = -(uHandles);
if (this->handleArrayOr_.size() > MAXIMUM_WAIT_OBJECTS)
{
this->RemoveSourceNB(source.source);
this->willCommitInFuture_ = true;
}
else
{
AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr + indexOffset, uHandles);
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd + indexOffset, uHandles);
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
continue;
}
}
}
//else
{
queueIterator.Next();
}
}
StartUserAndTakeOwn_Release();
return bReturnStatus;
}
AuUInt32 LoopQueue::WaitAny(AuUInt32 timeout)
{
AuUInt32 ret {};
bool lastItr {};
AuUInt64 startTime = AuTime::CurrentInternalClockMS();
AuUInt64 endTime = timeout ? (startTime + timeout) : AuUInt64(-1);
AuUInt32 chuggerIndex {};
do
{
if (WaitAnyNBSpurious(endTime, chuggerIndex, nullptr, false))
{
ret++;
}
}
while (WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0);
return ret;
}
AuList<AuSPtr<ILoopSource>> LoopQueue::WaitAnyEx(AuUInt32 timeout)
{
AuList<AuSPtr<ILoopSource>> trigger;
AuUInt64 startTime = AuTime::CurrentInternalClockMS();
AuUInt64 endTime = timeout ? (startTime + timeout) : AuUInt64(-1);
AuUInt32 chuggerIndex {};
do
{
if (WaitAnyNBSpurious(endTime, chuggerIndex, &trigger, false))
{
return trigger;
}
}
while (WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0);
return trigger;
}
AuUInt32 LoopQueue::PumpNonblocking()
{
AuUInt32 ret {};
bool lastItr {};
AuUInt32 chuggerIndex {};
do
{
if (WaitAnyNBSpurious(0, chuggerIndex, nullptr, false))
{
ret++;
}
}
while (WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0);
return ret;
}
AuList<AuSPtr<ILoopSource>> LoopQueue::PumpNonblockingEx()
{
AuList<AuSPtr<ILoopSource>> trigger;
AuUInt32 chuggerIndex {};
do
{
if (WaitAnyNBSpurious(0, chuggerIndex, &trigger, false))
{
return trigger;
}
}
while (WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0);
return trigger;
}
bool LoopQueue::ChugWaitAny(AuUInt64 internalEndTime, AuUInt32 &chuggerIndex, AuUInt32 &indexOfTriggered)
{
AuUInt32 count {};
AuUInt32 &index {chuggerIndex};
count = this->handleArrayOr_.size();
indexOfTriggered = -1;
if (index > count) index = 0;
bool active = this->hEvent_ == INVALID_HANDLE_VALUE;
while (count != index)
{
auto next = AuMin(count - index, AuUInt32(MAXIMUM_WAIT_OBJECTS));
auto sleepMS = this->slowTickMs_;
if (internalEndTime && internalEndTime != AuUInt64(-1))
{
auto now = AuTime::CurrentInternalClockMS();
auto delta = AuInt64(internalEndTime) - AuInt64(now);
if (delta <= 0)
{
return false;
}
sleepMS = AuMin(sleepMS, AuUInt32(delta));
}
DWORD status {};
if (this->bIsWinLoop_)
{
status = ::MsgWaitForMultipleObjectsEx(next, this->handleArrayOr_.data() + index, sleepMS, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE);
if (status == next)
{
indexOfTriggered = this->handleArrayOr_.size();
return true;
}
}
else
{
status = ::WaitForMultipleObjectsEx(next, this->handleArrayOr_.data() + index, false, sleepMS, true);
}
if (WaitStatusFromAligned(status, active))
{
DWORD offset = status - WAIT_OBJECT_0;
indexOfTriggered = offset + index;
return true;
}
if (status == WAIT_OBJECT_0 && !active)
{
return false;
}
if (status == WAIT_FAILED)
{
return false;
}
index += count;
}
return false;
}
bool LoopQueue::WaitAnyNBSpurious(AuUInt64 internalEndTime, AuUInt32 &chuggerIndex, AuList<AuSPtr<ILoopSource>> *trigger, bool poll)
{
bool status {};
DWORD temp;
AuUInt32 indexOfTriggered {};
AuUInt32 triggeredCount {};
AU_LOCK_GUARD(this->rwMutex_->AsReadable()); // the spurious wake up comes from an event that tells all to release me
if (trigger)
{
trigger->reserve(this->loopSourceExs_.size());
}
for (const auto &source : this->loopSourceExs_)
{
source.source->OnPresleep();
}
if ((this->handleArrayOr_.size() > MAXIMUM_WAIT_OBJECTS) || (this->forceChug_))
{
status = ChugWaitAny(internalEndTime, chuggerIndex, indexOfTriggered);
}
else
{
AuUInt32 sleepDelta {};
if (internalEndTime)
{
if (internalEndTime == AuUInt64(-1))
{
sleepDelta = INFINITE;
}
else
{
auto now = AuTime::CurrentInternalClockMS();
if (internalEndTime <= now)
{
return false;
}
sleepDelta = internalEndTime - now;
}
}
else
{
sleepDelta = 0;
}
if (this->bIsWinLoop_)
{
temp = ::MsgWaitForMultipleObjectsEx(this->handleArrayOr_.size(), this->handleArrayOr_.data(), sleepDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE);
}
else
{
temp = ::WaitForMultipleObjectsEx(this->handleArrayOr_.size(), this->handleArrayOr_.data(), false, sleepDelta, true);
}
status = WaitToRetStatus(temp);
if (status)
{
indexOfTriggered = temp - WAIT_OBJECT_0;
}
}
bool isPump = this->handleArrayOr_.size() == indexOfTriggered;
StartUserAndTakeOwn();
AuUInt firstTriggered {};
if (status)
{
if (!isPump)
{
firstTriggered = reinterpret_cast<AuUInt>(this->handleArrayOr_[indexOfTriggered]);
}
{
Iterator queueIterator(this);
AuSInt indexOffset {};
auto now = AuTime::CurrentInternalClockMS();
for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; )
{
bool shouldRemove {false};
auto &source = *queueIterator.itr;
AuUInt lastHandle {};
bool wasTriggered {};
if (source.source)
{
bool singular = source.source->Singular();
bool bPollTriggered = this->handleArrayOr_[queueIterator.startingIndexOr] == this->hDummy_;
if (!bPollTriggered)
{
if (!singular)
{
for (const auto &handle : source.source->GetHandles())
{
if ((firstTriggered == handle) ||
(::WaitForSingleObject(reinterpret_cast<HANDLE>(handle), 0) == WAIT_OBJECT_0))
{
lastHandle = handle;
wasTriggered = true;
break;
}
}
}
else
{
auto handle = source.source->GetHandle();
if ((firstTriggered == handle) ||
(::WaitForSingleObject(reinterpret_cast<HANDLE>(handle), 0) == WAIT_OBJECT_0))
{
lastHandle = firstTriggered;
wasTriggered = true;
}
}
}
if (bPollTriggered || (wasTriggered && source.source->OnTrigger(lastHandle)))
{
bool failing {};
if (poll)
{
if (!bPollTriggered)
{
failing = !AuTryInsert(this->handleArrayTainted_, AuMakeTuple(queueIterator.startingIndexAnd, 1, reinterpret_cast<HANDLE>(lastHandle)));
if (!failing)
{
triggeredCount++;
this->handleArrayAnd_[queueIterator.startingIndexAnd ] = reinterpret_cast<HANDLE>(this->hDummy_);
this->handleArrayOr_[queueIterator.startingIndexOr ] = reinterpret_cast<HANDLE>(this->hDummy_);
}
}
}
if (!poll)
{
if (this->handleArrayTainted_.size() &&
bPollTriggered)
{
AuTuple<int, int, HANDLE> ref;
bool matched = AuRemoveIf(this->handleArrayTainted_, [&](const AuTuple<int, int, HANDLE> &in) -> bool
{
bool bMatched = queueIterator.startingIndexAnd == AuGet<0>(in);
if (!bMatched)
{
return false;
}
ref = in;
return true;
});
if (matched)
{
if (AuGet<1>(ref) == 1)
{
auto handle = AuGet<2>(ref);
this->handleArrayOr_[queueIterator.startingIndexOr] = handle;
this->handleArrayAnd_[queueIterator.startingIndexAnd] = handle;
}
}
}
}
if (!poll || failing)
{
triggeredCount++;
shouldRemove = true;
AU_LOCK_GUARD(const_cast<AuThreadPrimitives::SpinLock *>(&source.lock)) // this spinlock really does feel like a hack
for (const auto &handler : source.callbacks.extended)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
}
if (shouldRemove)
{
for (const auto &handler : source.callbacks.base)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
if (shouldRemove)
{
AU_LOCK_GUARD(this->sourceMutex_); // TODO...
for (const auto &handler : globalSubcribers_)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
if ((source.callbacks.base.empty()) &&
(source.callbacks.extended.empty()))
{
shouldRemove = false;
}
}
}
}
if (shouldRemove)
{
if (trigger)
{
AuTryInsert(*trigger, source.source);
}
}
else
{
if (source.ConsiderTimeout(now))
{
shouldRemove = true;
}
}
source.source->OnFinishSleep();
if (shouldRemove)
{
if (source.source->GetType() == ELoopSource::eSourceWin32)
{
SysPanic("?");
}
else
{
AuUInt uHandles = (source.source->Singular() ? 1 : source.source->GetHandles().size());
int sOffset = -(uHandles);
if (this->handleArrayOr_.size() > MAXIMUM_WAIT_OBJECTS)
{
this->RemoveSourceNB(source.source);
this->willCommitInFuture_ = true;
}
else
{
AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr + indexOffset, uHandles);
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd + indexOffset, uHandles);
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
continue;
}
}
}
//else
{
queueIterator.Next();
}
}
}
}
else
{
ConsiderEvicitingTimeoutsAll();
}
StartUserAndTakeOwn_Release();
if (TryPumpWin32())
{
triggeredCount++;
}
return triggeredCount;
}
void LoopQueue::StartUserAndTakeOwn()
{
this->rwMutex_->UpgradeReadToWrite(0);
this->isCommitableInFuture_ = true;
}
bool LoopQueue::TryPumpWin32()
{
bool bMsgPump {};
if (!this->msgSource_)
{
return false;
}
if (AuStaticCast<Win32Dummy>(this->msgSource_)->bIsPumping_)
{
MSG msg;
try
{
while (PeekMessageW(&msg, NULL, 0, 0, PM_REMOVE))
{
TranslateMessage(&msg);
DispatchMessageW(&msg);
bMsgPump = true;
}
}
catch (...)
{
SysPushErrorCatch("Win32 Pump <-> Aur LoopQueue. Window handler threw a C++ exception.");
}
}
else
{
MSG msg;
if (PeekMessageW(&msg, NULL, 0, 0, PM_NOREMOVE))
{
bMsgPump = true;
}
}
// Notify all and remove if unwanted
if (bMsgPump)
{
AU_LOCK_GUARD(this->sourceMutex_);
bool bShouldRemove = true;
for (const auto &handler : this->msgCallbacks_.base)
{
try
{
bShouldRemove &= handler->OnFinished(this->msgSource_);
}
catch (...)
{
SysPushErrorCatch();
}
}
for (const auto &handler : this->globalSubcribers_)
{
try
{
bShouldRemove &= handler->OnFinished(this->msgSource_);
}
catch (...)
{
SysPushErrorCatch();
}
}
if (bShouldRemove)
{
this->bIsWinLoop_ = false;
}
}
return bMsgPump;
}
void LoopQueue::StartUserAndTakeOwn_Release()
{
this->isCommitableInFuture_ = false;
if (this->willCommitInFuture_)
{
this->CommitLocked();
}
this->rwMutex_->DowngradeWriteToRead();
}
// callee must own handle array
void LoopQueue::ConsiderEvicitingTimeoutsAll()
{
Iterator queueIterator(this);
bool bRebuildFromAnd {};
auto now = AuTime::CurrentInternalClockMS();
for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; )
{
auto &source = *queueIterator.itr;
if (!source.ConsiderTimeout(now))
{
queueIterator.Next();
source.source->OnFinishSleep();
continue;
}
source.source->OnFinishSleep();
if (source.source->GetType() == ELoopSource::eSourceWin32)
{
// Null message loop hack
this->msgSource_.reset();
this->msgCallbacks_ = {};
this->bIsWinLoop_ = false;
}
else
{
auto count = source.source->Singular() ? 1 : source.source->GetHandles().size();
if (this->handleArrayOr_.size() <= MAXIMUM_WAIT_OBJECTS)
{
AuRemoveRange(this->handleArrayOr_, queueIterator.startingIndexOr, count);
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count);
}
else
{
bRebuildFromAnd = true;
AuRemoveRange(this->handleArrayAnd_, queueIterator.startingIndexAnd, count);
}
queueIterator.Delete(this->loopSourceExs_.erase(queueIterator.itr));
break;
}
queueIterator.Next();
}
if (bRebuildFromAnd)
{
BuildFromAndArray();
}
}
AUKN_SYM AuSPtr<ILoopQueue> NewLoopQueue()
{
return AuMakeShared<LoopQueue>();
}
}