AuroraRuntime/Source/Loop/LoopQueue.NT.cpp

863 lines
24 KiB
C++
Raw Normal View History

[*/+/-] MEGA COMMIT. ~2 weeks compressed. The intention is to quickly improve and add util apis, enhance functionality given current demands, go back to the build pipeline, finish that, publish runtime tests, and then use what we have to go back to to linux support with a more stable api. [+] AuMakeSharedArray [+] Technet ArgvQuote [+] Grug subsystem (UNIX signal thread async safe ipc + telemetry flusher + log flusher.) [+] auEndianness -> Endian swap utils [+] AuGet<N>(...) [*] AUE_DEFINE conversion for ECompresionType, EAnsiColor, EHashType, EStreamError, EHexDump [+] ConsoleMessage ByteBuffer serialization [+] CmdLine subsystem for parsing command line arguments and simple switch/flag checks [*] Split logger from console subsystem [+] StartupParameters -> A part of a clean up effort under Process [*] Refactor SysErrors header + get caller hack [+] Atomic APIs [+] popcnt [+] Ring Buffer sink [+] Added more standard errors Catch, Submission, LockError, NoAccess, ResourceMissing, ResourceLocked, MalformedData, InSandboxContext, ParseError [+] Added ErrorCategorySet, ErrorCategoryClear, GetStackTrace [+] IExitSubscriber, ETriggerLevel [*] Write bias the high performance RWLockImpl read-lock operation operation [+] ExitHandlerAdd/ExitHandlerRemove (exit subsystem) [*] Updated API style Digests [+] CpuId::CpuBitCount [+] GetUserProgramsFolder [+] GetPackagePath [*] Split IStreamReader with an inl file [*] BlobWriter/BlobReader/BlobArbitraryReader can now take shared pointers to bytebuffers. default constructor allocates a new scalable bytebuffer [+] ICharacterProvider [+] ICharacterProviderEx [+] IBufferedCharacterConsumer [+] ProviderFromSharedString [+] ProviderFromString [+] BufferConsumerFromProvider [*] Parse Subsystem uses character io bufferer [*] Rewritten NT's high perf semaphore to use userland SRW/ConVars [like mutex, based on generic semaphore] [+] ByteBuffer::ResetReadPointer [*] Bug fix bytebuffer base not reset on free and some scaling issues [+] ProcessMap -> Added kSectionNameStack, kSectionNameFile, kSectionNameHeap for Section [*] ProcessMap -> Refactor Segment to Section. I was stupid for keeping a type conflict hack API facing [+] Added 64 *byte* fast RNG seeds [+] File Advisorys/File Lock Awareness [+] Added extended IAuroraThread from OS identifier caches for debug purposes [*] Tweaked how memory is reported on Windows. Better consistency of what values mean across functions. [*] Broke AuroraUtils/Typedefs out into a separate library [*] Update build script [+] Put some more effort into adding detail to the readme before rewriting it, plus, added some media [*] Improved public API documentation [*] Bug fix `SetConsoleCtrlHandler` [+] Locale TimeDateToFileNameISO8601 [+] Console config stdOutShortTime [*] Begin using internal UTF8/16 decoders when platform support isnt available (instead of stl) [*] Bug fixes in decoders [*] Major bug fix, AuMax [+] RateLimiter [+] Binary file sink [+] Log directory sink [*] Data header usability (more operators) [+] AuRemoveRange [+] AuRemove [+] AuTryRemove [+] AuTryRemoveRange [+] auCastUtils [+] Finish NewLSWin32Source [+] AuTryFindByTupleN, AuTryRemoveByTupleN [+] Separated AuRead/Write types, now in auTypeUtils [+] Added GetPosition/SetPosition to FileWriter [*] Fix stupid AuMin in place of AuMax in SpawnThread.Unix.Cpp [*] Refactored Arbitrary readers to SeekingReaders (as in, they could be atomic and/or parallelized, and accept an arbitrary position as a work parameter -> not Seekable, as in, you can simply set the position) [*] Hack back in the sched deinit [+] File AIO loop source interop [+] Begin to prototype a LoopQueue object I had in mind for NT, untested btw [+] Stub code for networking [+] Compression BaseStream/IngestableStreamBase [*] Major: read/write locks now support write-entrant read routines. [*] Compression subsystem now uses the MemoryView concept [*] Rewrite the base stream compressions, made them less broken [*] Update hashing api [*] WriterTryGoForward and ReaderTryGoForward now revert to the previous relative index instead of panicing [+] Added new AuByteBuffer apis Trim, Pad, WriteFrom, WriteString, [TODO: ReadString] [+] Added ByteBufferPushReadState [+] Added ByteBufferPushWriteState [*] Move from USC-16 to full UTF-16. Win32 can handle full UTF-16. [*] ELogLevel is now an Aurora enum [+] Raised arbitrary limit in header to 255, the max filter buffer [+] Explicit GZip support [+] Explicit Zip support [+] Added [some] compressors et al
2022-02-17 00:11:40 +00:00
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: LoopQueue.NT.cpp
Date: 2022-1-8
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Loop.NT.hpp"
#include "ILoopSourceEx.hpp"
#include "LSWin32.NT.hpp"
#include "LoopQueue.NT.hpp"
static bool WaitToRetStatus(DWORD ret)
{
return !((ret == WAIT_TIMEOUT) ||
(ret == WAIT_IO_COMPLETION) ||
(ret == WAIT_FAILED));
}
static bool WaitStatusFromAligned(DWORD ret, bool active)
{
return (WaitToRetStatus(ret) && ((active) || (ret != WAIT_OBJECT_0 /*lock releaser*/)));
}
namespace Aurora::Loop
{
LoopQueue::LoopQueue()
{
this->hEvent_ = CreateEventW(NULL, false, false, NULL);
}
LoopQueue::~LoopQueue()
{
AuWin32CloseHandle(this->hEvent_);
}
bool LoopQueue::IsValid()
{
return this->hEvent_ != INVALID_HANDLE_VALUE && this->handleArray_.size();
}
struct LoopJoinable
{
LoopQueue *that;
void Lock()
{
that->Sync();
}
void Unlock()
{
that->Unlock();
}
};
void LoopQueue::Sync()
{
if (this->hEvent_ != INVALID_HANDLE_VALUE)
{
this->rwMutex_->AsWritable()->Lock();
return;
}
SetEvent(this->hEvent_);
this->rwMutex_->AsWritable()->Lock();
ResetEvent(this->hEvent_);
}
void LoopQueue::Unlock()
{
this->rwMutex_->AsWritable()->Unlock();
}
bool LoopQueue::SourceAdd(const AuSPtr<ILoopSource> &source)
{
AU_LOCK_GUARD(this->sourceMutex_);
if (!AuTryInsert(this->sources_, AuMakeTuple(source, 0)))
{
return false;
}
return true;
}
bool LoopQueue::SourceAddWithTimeout(const AuSPtr<ILoopSource> &source, AuUInt32 maxTimeout)
{
AU_LOCK_GUARD(this->sourceMutex_);
if (!AuTryInsert(this->sources_, AuMakeTuple(source, AuTime::CurrentInternalClockMS() + maxTimeout)))
{
return false;
}
return true;
}
bool LoopQueue::SourceRemove(const AuSPtr<ILoopSource> &source)
{
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->removedSources_, source);
}
void LoopQueue::RemoveSourceNB(const AuSPtr<ILoopSource> &source, bool removeHandles)
{
AU_LOCK_GUARD(this->sourceMutex_); // me
AuTryRemoveByTupleN<0>(this->sources_, source); // guarded by ^
AuRemoveIf(this->loopSourceExs_, [source](const auto &re)
{
return re.sourceBase == source;
}); // guarded by the loop joinable
if (source->GetType() == ELoopSource::eSourceWin32)
{
this->msgSource_ = {};
this->msgCallbacks_ = {};
this->bIsWinLoop_ = false;
return;
}
if (!removeHandles)
{
return;
}
AuUInt handleIndex {};
for (const auto &sourceEx : this->loopSourceExs_)
{
AuUInt lastHandle {};
bool wasTriggered {};
auto handles = sourceEx.source->GetHandles();
auto lsStartIndex = handleIndex;
auto delta = handles.size();
if ((handleIndex % 64) == 0)
{
handleIndex++;
}
handleIndex += delta;
if (sourceEx.source == source)
{
AuRemoveRange(this->handleArray_, lsStartIndex, delta);
}
}
}
bool LoopQueue::Commit()
{
LoopJoinable joinable {this};
AU_LOCK_GUARD(joinable);
try
{
for (const auto &re : this->removedSources_)
{
RemoveSourceNB(re, this->sources_.empty());
removedSources_.clear();
}
if (this->sources_.empty() && this->removedSources_.size())
{
return true;
}
// Reset relevant OS cache
this->bIsWinLoop_ = false;
this->bIsThreadSafe_ = false;
AuTryClear(this->loopSourceExs_);
AuTryClear(this->handleArray_);
// Reserve the cache arrays for initialization
this->loopSourceExs_.reserve(this->sources_.size());
this->handleArray_.reserve(this->sources_.size());
//
for (const auto &[source, timeout] : sources_)
{
// Filter bad sources
if (!source)
{
continue;
}
// Win32 edge case
if (source->GetType() == ELoopSource::eSourceWin32)
{
this->bIsWinLoop_ = true;
this->msgSource_ = source;
continue;
}
// Handle known ILoopSourceEx handle objects
if (auto extended = AuDynamicCast<ILoopSourceEx>(source))
{
ExtendeSourceInfo t {extended};
t.timeoutAbs = timeout;
t.sourceBase = source;
t.source = extended;
this->loopSourceExs_.push_back(t);
if ((this->handleArray_.size() % MAXIMUM_WAIT_OBJECTS) == 0)
{
if (this->hEvent_ != INVALID_HANDLE_VALUE)
{
this->bIsThreadSafe_ = true;
this->handleArray_.push_back(hEvent_);
}
}
for (const auto &handle : extended->GetHandles())
{
auto nthandle = reinterpret_cast<HANDLE>(handle);
this->handleArray_.push_back(nthandle);
}
}
}
return true;
}
catch (...)
{
AuTryClear(this->loopSourceExs_);
AuTryClear(this->handleArray_);
return {};
}
}
AuUInt32 LoopQueue::GetSourceCount()
{
return this->loopSourceExs_.size();
}
void LoopQueue::ChugPathConfigure(AuUInt32 sectionTickTime, AuSInt sectionDequeCount)
{
this->slowTickRate_ = AuMin(sectionDequeCount, AuSInt(MAXIMUM_WAIT_OBJECTS));
this->slowTickMs_ = sectionTickTime ? sectionTickTime : 1;
}
void LoopQueue::ChugHint(bool value)
{
this->forceChug_ = value;
}
bool LoopQueue::AddCallback(const AuSPtr<ILoopSource> &source, const AuSPtr<ILoopSourceSubscriber> &subscriber)
{
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
if (!source)
{
SysPushErrorArg();
return {};
}
if (!subscriber)
{
SysPushErrorArg();
return {};
}
if (source->GetType() == ELoopSource::eSourceWin32)
{
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->msgCallbacks_, subscriber);
}
AuUInt32 count {};
for (auto &sourceEx : this->loopSourceExs_)
{
if (sourceEx.sourceBase != source)
{
continue;
}
AU_LOCK_GUARD(sourceEx.lock);
if (!AuTryInsert(sourceEx.specificSubscribers, subscriber))
{
return {};
}
count++;
}
return count;
}
bool LoopQueue::AddCallbackEx(const AuSPtr<ILoopSource> &source, const AuSPtr<ILoopSourceSubscriberEx> &subscriber)
{
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
if (!source)
{
SysPushErrorArg();
return {};
}
if (!subscriber)
{
SysPushErrorArg();
return {};
}
if (source->GetType() == ELoopSource::eSourceWin32)
{
return false;
}
AuUInt32 count {};
for (auto &sourceEx : this->loopSourceExs_)
{
if (sourceEx.sourceBase != source)
{
continue;
}
AU_LOCK_GUARD(sourceEx.lock);
if (!AuTryInsert(sourceEx.timeoutSubscribers, subscriber))
{
return {};
}
count++;
}
return count;
}
bool LoopQueue::AddCallback(const AuSPtr<ILoopSourceSubscriber> &subscriber)
{
AU_LOCK_GUARD(rwMutex_->AsReadable());
if (!subscriber)
{
SysPushErrorArg();
return {};
}
// yes, this seems insane; however, the spinlock is basically free.
// disturbing the rwMutex_ would mean spuriously waking up all the Wait[...]Nb functions
// unlikely, but whats even less likely is for each sources spinlock to be held.
// this should be a work init-bound function, not wait or commit, so the vector abuse doesnt matter so much
for (auto &sourceEx : this->loopSourceExs_)
{
AU_LOCK_GUARD(sourceEx.lock); // hack lock, but its a spin zoomer
if (!AuTryInsert(sourceEx.globalSubscribers, subscriber))
{
return {};
}
}
{
AU_LOCK_GUARD(this->sourceMutex_);
return AuTryInsert(this->msgCallbacks_, subscriber);
}
return true;
}
bool LoopQueue::HasFinished()
{
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
AuUInt32 askers {};
return WaitAnyNBSpurious(INFINITE, askers, nullptr, true) == this->sources_.size();
}
bool LoopQueue::IsSignaled()
{
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
AuUInt32 askers {};
return WaitAnyNBSpurious(0, askers, nullptr, true);
}
bool LoopQueue::WaitAll(AuUInt32 timeout)
{
AU_LOCK_GUARD(this->rwMutex_->AsReadable());
bool bReturnStatus {true};
bool bTimeout {false};
AuUInt32 count {};
AuUInt32 index {};
if (this->handleArray_.empty())
{
return true;
}
count = this->handleArray_.size();
AuUInt32 startTime = AuTime::CurrentInternalClockMS();
AuUInt32 endTime = startTime + timeout;
bool active = this->hEvent_ == INVALID_HANDLE_VALUE;
while (count != index)
{
auto next = AuMin(count - index, AuUInt32(MAXIMUM_WAIT_OBJECTS));
startTime = AuTime::CurrentInternalClockMS();
if (endTime <= startTime)
{
ConsiderEvicitingTimeoutsAll();
return false;
}
auto timeDelta = endTime - startTime; // TODO: cap to last obj
DWORD status {};
if (this->bIsWinLoop_)
{
status = ::MsgWaitForMultipleObjectsEx(next, this->handleArray_.data() + index, timeDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE | MWMO_WAITALL);
}
else
{
status = ::WaitForMultipleObjectsEx(next, this->handleArray_.data() + index, true, timeDelta, true);
}
if (WaitStatusFromAligned(status, active))
{
index += count;
// continue the sleep after succesfully passing MAXIMUM_WAIT_OBJECTS or [..., EOS] objects
continue;
}
// Account for updates
if (status == WAIT_OBJECT_0 && !active)
{
count = this->handleArray_.size();
if (count == 0)
{
return true;
}
else if (index > count)
{
index = 0;
}
continue;
}
// Account for win32 failures
if (status == WAIT_FAILED)
{
bReturnStatus = false;
break;
}
// Ingore the other unrelated errors (APC notification, timeout, etc)
}
ConsiderEvicitingTimeoutsAll();
return bReturnStatus;
}
AuUInt32 LoopQueue::WaitAny(AuUInt32 timeout)
{
AuUInt32 ret {};
bool lastItr {};
AuUInt32 startTime = AuTime::CurrentInternalClockMS();
AuUInt32 endTime = timeout ? (startTime + timeout) : INFINITE;
AuUInt32 chuggerIndex {};
do
{
if (WaitAnyNBSpurious(endTime, chuggerIndex, nullptr, false))
{
ret++;
}
}
while (WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0);
return ret;
}
AuList<AuSPtr<ILoopSource>> LoopQueue::WaitAnyEx(AuUInt32 timeout)
{
AuList<AuSPtr<ILoopSource>> trigger;
AuUInt32 startTime = AuTime::CurrentInternalClockMS();
AuUInt32 endTime = timeout ? (startTime + timeout) : INFINITE;
AuUInt32 chuggerIndex {};
do
{
if (WaitAnyNBSpurious(endTime, chuggerIndex, &trigger, false))
{
return trigger;
}
}
while (WaitForSingleObject(this->hEvent_, 0) == WAIT_OBJECT_0);
return trigger;
}
bool LoopQueue::ChugWaitAny(AuUInt32 timeout, AuUInt32 &chuggerIndex, AuUInt32 &indexOfTriggered)
{
AuUInt32 count {};
AuUInt32 &index {chuggerIndex};
count = this->handleArray_.size();
indexOfTriggered = -1;
if (index > count) index = 0;
bool active = this->hEvent_ == INVALID_HANDLE_VALUE;
while (count != index)
{
auto next = AuMin(count - index, AuUInt32(MAXIMUM_WAIT_OBJECTS));
if (timeout && timeout <= AuTime::CurrentInternalClockMS())
{
return false;
}
DWORD status {};
if (this->bIsWinLoop_)
{
status = ::MsgWaitForMultipleObjectsEx(next, this->handleArray_.data() + index, this->slowTickMs_, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE);
if (status == next)
{
return this->handleArray_.size();
}
}
else
{
status = ::WaitForMultipleObjectsEx(next, this->handleArray_.data() + index, false, this->slowTickMs_, true);
}
if (WaitStatusFromAligned(status, active))
{
DWORD offset = status - WAIT_OBJECT_0;
indexOfTriggered = offset + index;
return true;
}
if (status == WAIT_OBJECT_0 && !active)
{
return false;
}
if (status == WAIT_FAILED)
{
return false;
}
index += count;
}
return false;
}
bool LoopQueue::WaitAnyNBSpurious(AuUInt32 timeout, AuUInt32 &chuggerIndex, AuList<AuSPtr<ILoopSource>> *trigger, bool poll)
{
bool status {};
DWORD temp;
AuUInt32 indexOfTriggered {};
AuUInt32 triggeredCount {};
AU_LOCK_GUARD(this->rwMutex_->AsReadable()); // the spurious wake up comes from an event that tells all to release me
if (trigger)
{
trigger->reserve(this->sources_.size());
}
for (const auto &source : this->loopSourceExs_)
{
source.source->OnPresleep();
}
if ((this->handleArray_.size() > MAXIMUM_WAIT_OBJECTS) || (this->forceChug_))
{
status = ChugWaitAny(timeout, chuggerIndex, indexOfTriggered);
}
else
{
auto now = AuTime::CurrentInternalClockMS();
if (timeout && timeout <= now)
{
return false;
}
auto sleepDelta = timeout - now;
if (this->bIsWinLoop_)
{
temp = ::MsgWaitForMultipleObjectsEx(this->handleArray_.size(), this->handleArray_.data(), sleepDelta, QS_ALLPOSTMESSAGE | QS_ALLINPUT | QS_ALLEVENTS, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE);
}
else
{
temp = ::WaitForMultipleObjectsEx(this->handleArray_.size(), this->handleArray_.data(), false, sleepDelta, true);
}
status = WaitToRetStatus(temp);
if (status)
{
indexOfTriggered = temp - WAIT_OBJECT_0;
}
}
bool isPump = this->handleArray_.size() == indexOfTriggered;
AuUInt firstTriggered {};
if (status)
{
if (!isPump)
{
firstTriggered = reinterpret_cast<AuUInt>(this->handleArray_[indexOfTriggered]);
}
AuUInt handleIndex {};
for (auto itr = this->loopSourceExs_.begin(); this->loopSourceExs_.end() != itr; )
{
auto &source = *itr;
AuUInt lastHandle {};
bool wasTriggered {};
auto handles = source.source->GetHandles();
if ((handleIndex % 64) == 0)
{
handleIndex++;
}
auto lsStartIndex = handleIndex;
for (const auto &handle : handles)
{
handleIndex++;
if ((firstTriggered == handle) ||
(::WaitForSingleObject(reinterpret_cast<HANDLE>(handle), 0) == WAIT_OBJECT_0))
{
lastHandle = handle;
wasTriggered = true;
break;
}
}
bool shouldRemove {false};
if (wasTriggered && source.source->OnTrigger(lastHandle))
{
triggeredCount++;
shouldRemove = true;
AU_LOCK_GUARD(const_cast<AuThreadPrimitives::SpinLock *>(&source.lock)) // this spinlock really does feel like a hack
for (const auto &handler : source.specificSubscribers)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
}
if (shouldRemove)
{
for (const auto &handler : source.timeoutSubscribers)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
if (shouldRemove)
{
for (const auto &handler : source.globalSubscribers)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
if ((source.specificSubscribers.empty()) &&
(source.globalSubscribers.empty()) &&
(source.timeoutSubscribers.empty()))
{
shouldRemove = false;
}
}
if (shouldRemove)
{
if (trigger)
{
AuTryInsert(*trigger, source.source);
}
}
if (source.ConsiderTimeout())
{
shouldRemove = true;
}
source.source->OnFinishSleep();
if (shouldRemove)
{
// Evict from OS cache
AuRemoveRange(this->handleArray_, lsStartIndex, handles.size());
{
AU_LOCK_GUARD(this->sourceMutex_);
AuTryRemoveByTupleN<0>(this->sources_, source.sourceBase);
}
if (source.source->GetType() == ELoopSource::eSourceWin32)
{
this->msgSource_ = {};
this->msgCallbacks_ = {};
this->bIsWinLoop_ = false;
}
itr = this->loopSourceExs_.erase(itr);
handleIndex -= handles.size();
}
else
{
itr++;
}
}
}
else
{
ConsiderEvicitingTimeoutsAll();
}
if (isPump)
{
triggeredCount++;
// Notify all and remove if unwanted
{
AU_LOCK_GUARD(this->sourceMutex_);
2022-03-11 04:07:15 +00:00
if (trigger)
{
AuTryInsert(*trigger, this->msgSource_);
}
[*/+/-] MEGA COMMIT. ~2 weeks compressed. The intention is to quickly improve and add util apis, enhance functionality given current demands, go back to the build pipeline, finish that, publish runtime tests, and then use what we have to go back to to linux support with a more stable api. [+] AuMakeSharedArray [+] Technet ArgvQuote [+] Grug subsystem (UNIX signal thread async safe ipc + telemetry flusher + log flusher.) [+] auEndianness -> Endian swap utils [+] AuGet<N>(...) [*] AUE_DEFINE conversion for ECompresionType, EAnsiColor, EHashType, EStreamError, EHexDump [+] ConsoleMessage ByteBuffer serialization [+] CmdLine subsystem for parsing command line arguments and simple switch/flag checks [*] Split logger from console subsystem [+] StartupParameters -> A part of a clean up effort under Process [*] Refactor SysErrors header + get caller hack [+] Atomic APIs [+] popcnt [+] Ring Buffer sink [+] Added more standard errors Catch, Submission, LockError, NoAccess, ResourceMissing, ResourceLocked, MalformedData, InSandboxContext, ParseError [+] Added ErrorCategorySet, ErrorCategoryClear, GetStackTrace [+] IExitSubscriber, ETriggerLevel [*] Write bias the high performance RWLockImpl read-lock operation operation [+] ExitHandlerAdd/ExitHandlerRemove (exit subsystem) [*] Updated API style Digests [+] CpuId::CpuBitCount [+] GetUserProgramsFolder [+] GetPackagePath [*] Split IStreamReader with an inl file [*] BlobWriter/BlobReader/BlobArbitraryReader can now take shared pointers to bytebuffers. default constructor allocates a new scalable bytebuffer [+] ICharacterProvider [+] ICharacterProviderEx [+] IBufferedCharacterConsumer [+] ProviderFromSharedString [+] ProviderFromString [+] BufferConsumerFromProvider [*] Parse Subsystem uses character io bufferer [*] Rewritten NT's high perf semaphore to use userland SRW/ConVars [like mutex, based on generic semaphore] [+] ByteBuffer::ResetReadPointer [*] Bug fix bytebuffer base not reset on free and some scaling issues [+] ProcessMap -> Added kSectionNameStack, kSectionNameFile, kSectionNameHeap for Section [*] ProcessMap -> Refactor Segment to Section. I was stupid for keeping a type conflict hack API facing [+] Added 64 *byte* fast RNG seeds [+] File Advisorys/File Lock Awareness [+] Added extended IAuroraThread from OS identifier caches for debug purposes [*] Tweaked how memory is reported on Windows. Better consistency of what values mean across functions. [*] Broke AuroraUtils/Typedefs out into a separate library [*] Update build script [+] Put some more effort into adding detail to the readme before rewriting it, plus, added some media [*] Improved public API documentation [*] Bug fix `SetConsoleCtrlHandler` [+] Locale TimeDateToFileNameISO8601 [+] Console config stdOutShortTime [*] Begin using internal UTF8/16 decoders when platform support isnt available (instead of stl) [*] Bug fixes in decoders [*] Major bug fix, AuMax [+] RateLimiter [+] Binary file sink [+] Log directory sink [*] Data header usability (more operators) [+] AuRemoveRange [+] AuRemove [+] AuTryRemove [+] AuTryRemoveRange [+] auCastUtils [+] Finish NewLSWin32Source [+] AuTryFindByTupleN, AuTryRemoveByTupleN [+] Separated AuRead/Write types, now in auTypeUtils [+] Added GetPosition/SetPosition to FileWriter [*] Fix stupid AuMin in place of AuMax in SpawnThread.Unix.Cpp [*] Refactored Arbitrary readers to SeekingReaders (as in, they could be atomic and/or parallelized, and accept an arbitrary position as a work parameter -> not Seekable, as in, you can simply set the position) [*] Hack back in the sched deinit [+] File AIO loop source interop [+] Begin to prototype a LoopQueue object I had in mind for NT, untested btw [+] Stub code for networking [+] Compression BaseStream/IngestableStreamBase [*] Major: read/write locks now support write-entrant read routines. [*] Compression subsystem now uses the MemoryView concept [*] Rewrite the base stream compressions, made them less broken [*] Update hashing api [*] WriterTryGoForward and ReaderTryGoForward now revert to the previous relative index instead of panicing [+] Added new AuByteBuffer apis Trim, Pad, WriteFrom, WriteString, [TODO: ReadString] [+] Added ByteBufferPushReadState [+] Added ByteBufferPushWriteState [*] Move from USC-16 to full UTF-16. Win32 can handle full UTF-16. [*] ELogLevel is now an Aurora enum [+] Raised arbitrary limit in header to 255, the max filter buffer [+] Explicit GZip support [+] Explicit Zip support [+] Added [some] compressors et al
2022-02-17 00:11:40 +00:00
bool shouldRemove {true};
for (const auto &handler : this->msgCallbacks_)
{
try
{
shouldRemove &= handler->OnFinished(this->msgSource_);
}
catch (...)
{
SysPushErrorCatch();
}
}
if (shouldRemove)
{
this->bIsWinLoop_ = false;
}
}
// Do the work of pumping the Win32 event queue if the user didn't flush the queue during the OnFinished callback
{
if (AuStaticCast<Win32Dummy>(this->msgSource_)->bIsPumping_)
{
MSG msg;
try
{
while (PeekMessageW(&msg, NULL, 0, 0, PM_REMOVE))
{
TranslateMessage(&msg);
DispatchMessageW(&msg);
}
}
catch (...)
{
SysPushErrorCatch("Win32 Pump <-> Aur LoopQueue. Window handler threw a C++ exception.");
}
}
}
}
return triggeredCount;
}
void LoopQueue::ConsiderEvicitingTimeoutsAll()
{
AuUInt32 startIndex {};
for (auto itr = this->loopSourceExs_.begin(); this->loopSourceExs_.end() != itr; )
{
auto &source = *itr;
if ((startIndex % 64) == 0)
{
startIndex++;
}
auto count = source.source->GetHandles().size();
auto begin = startIndex;
startIndex += count;
if (!source.ConsiderTimeout())
{
itr++;
continue;
}
// Evict from OS cache
AuRemoveRange(this->handleArray_, startIndex, count);
{
AU_LOCK_GUARD(this->sourceMutex_);
AuTryRemoveByTupleN<0>(this->sources_, source.sourceBase);
}
if (source.source->GetType() == ELoopSource::eSourceWin32)
{
this->msgSource_ = {};
this->msgCallbacks_ = {};
this->bIsWinLoop_ = false;
}
itr = this->loopSourceExs_.erase(itr);
}
}
AUKN_SYM AuSPtr<ILoopQueue> NewLoopQueue()
{
return AuMakeShared<LoopQueue>();
}
}