[*] Begin updating NT's loopqueue

This commit is contained in:
Reece Wilson 2022-04-16 14:01:33 +01:00
parent 7a0d9701b5
commit 4847519624
6 changed files with 165 additions and 156 deletions

View File

@ -19,6 +19,7 @@ namespace Aurora::Loop
{
auto handle = reinterpret_cast<HANDLE>(this->handle);
AuWin32CloseHandle(handle);
this->handle = kInvalidHandle;
}
bool LSEvent::Set()

View File

@ -17,6 +17,7 @@ namespace Aurora::Loop
{
auto handle = reinterpret_cast<HANDLE>(this->handle);
AuWin32CloseHandle(handle);
this->handle = kInvalidHandle;
}
bool Mutex::Unlock()

View File

@ -17,6 +17,7 @@ namespace Aurora::Loop
{
auto handle = reinterpret_cast<HANDLE>(this->handle);
AuWin32CloseHandle(handle);
this->handle = kInvalidHandle;
}
bool LSSemaphore::AddOne()

View File

@ -911,7 +911,7 @@ namespace Aurora::Loop
}
// Notify global subscribers, allowing them to preempt removal
if (bShouldRemove)
if (bShouldRemove || bOverload)
{
AU_LOCK_GUARD(this->parent->globalLockMutex_);
for (const auto &handler : this->parent->allSubscribers_)

View File

@ -26,10 +26,150 @@ static bool WaitStatusFromAligned(DWORD ret, bool active)
namespace Aurora::Loop
{
LoopQueue::ExtendedSourceInfo::ExtendedSourceInfo(const AuSPtr<ILoopSourceEx> &in) : source(in)
{
}
bool LoopQueue::ExtendedSourceInfo::ConsiderTimeout(AuUInt64 time) const
{
if (!this->timeoutAbs)
{
return false;
}
if (time < timeoutAbs)
{
return false;
}
for (const auto &handler : callbacks.extended)
{
try
{
handler->OnTimeout(sourceBase);
}
catch (...)
{
SysPushErrorCatch();
}
}
return true;
}
AuPair<bool, bool> LoopQueue::ExtendedSourceInfo::DoWork(LoopQueue *queue, AuUInt handle)
{
bool bShouldRemove {true};
AuUInt8 uPosition {};
if (this->source && handle != -1)
{
if (!this->source->OnTrigger(handle))
{
return {};
}
}
bool bOverload {};
{
AU_LOCK_GUARD(&this->lock);
if ((this->callbacks.base.empty()) &&
(this->callbacks.extended.empty()))
{
bOverload = true;
}
// Notify callbacks...
for (auto itr = this->callbacks.extended.begin();
itr != this->callbacks.extended.end(); )
{
bool result;
auto handler = *itr;
try
{
result = handler->OnFinished(this->source, uPosition++);
}
catch (...)
{
SysPushErrorCatch();
}
bShouldRemove &= result;
if (result)
{
itr = this->callbacks.extended.erase(itr);
}
else
{
itr++;
}
}
for (auto itr = this->callbacks.base.begin();
itr != this->callbacks.base.end(); )
{
bool result;
auto handler = *itr;
try
{
result = handler->OnFinished(this->source);
}
catch (...)
{
SysPushErrorCatch();
}
bShouldRemove &= result;
if (result)
{
itr = this->callbacks.base.erase(itr);
}
else
{
itr++;
}
}
}
// Evict when subs count hit zero, not when sub count started off at zero
if (bOverload)
{
bShouldRemove = false;
}
// Notify global subscribers, allowing them to preempt removal
if (bShouldRemove || bOverload)
{
AU_LOCK_GUARD(queue->sourceMutex_);
for (const auto &handler : queue->globalSubcribers_)
{
try
{
bShouldRemove &= handler->OnFinished(this->source);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
return AuMakePair(true, bShouldRemove);
}
LoopQueue::LoopQueue()
{
this->hEvent_ = CreateEventW(NULL, true, false, NULL);
this->hDummy_ = CreateEventW(NULL, true, true, NULL);
this->hEvent_ = ::CreateEventW(NULL, true, false, NULL);
this->hDummy_ = ::CreateEventW(NULL, true, true, NULL);
}
LoopQueue::~LoopQueue()
@ -65,11 +205,11 @@ namespace Aurora::Loop
return;
}
SetEvent(this->hEvent_);
::SetEvent(this->hEvent_);
this->rwMutex_->AsWritable()->Lock();
ResetEvent(this->hEvent_);
::ResetEvent(this->hEvent_);
}
void LoopQueue::Unlock()
@ -117,10 +257,11 @@ namespace Aurora::Loop
return {};
}
Iterator queueIterator(this);
bool bRebuildFromAnd {};
for ( queueIterator.Start(); queueIterator.End() != queueIterator.itr; )
Iterator queueIterator(this);
for (queueIterator.Start();
queueIterator.End() != queueIterator.itr; )
{
if (queueIterator.itr->sourceBase == source)
{
@ -169,7 +310,6 @@ namespace Aurora::Loop
return CommitLocked();
}
void LoopQueue::BuildFromAndArray()
{
this->handleArrayOr_.clear();
@ -205,7 +345,6 @@ namespace Aurora::Loop
this->handleArrayAnd_.reserve(this->loopSourceExs_.size() + this->addedSources_.size());
this->handleArrayOr_.reserve(this->loopSourceExs_.size() + this->addedSources_.size());
bShouldRebuild |= this->addedSources_.size();
//
@ -229,7 +368,7 @@ namespace Aurora::Loop
// Handle known ILoopSourceEx handle objects
if (auto extended = AuDynamicCast<ILoopSourceEx>(source))
{
ExtendeSourceInfo t {extended};
ExtendedSourceInfo t {extended};
t.timeoutAbs = timeout;
t.sourceBase = source;
t.source = extended;
@ -507,64 +646,8 @@ namespace Aurora::Loop
bool shouldRemove {true};
auto &source = *queueIterator.itr;
if (source.source)
{
if (source.source->OnTrigger(source.source->Singular() ? source.source->GetHandle() : source.source->GetHandles()[0]))
{
AU_LOCK_GUARD(const_cast<AuThreadPrimitives::SpinLock *>(&source.lock)) // this spinlock really does feel like a hack
for (const auto &handler : source.callbacks.extended)
{
try
{
shouldRemove &= handler->OnFinished(source.source, 0);
}
catch (...)
{
SysPushErrorCatch();
}
}
if (shouldRemove)
{
for (const auto &handler : source.callbacks.base)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
if (shouldRemove)
{
AU_LOCK_GUARD(this->sourceMutex_); // TODO...
for (const auto &handler : this->globalSubcribers_)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
if ((source.callbacks.base.empty()) &&
(source.callbacks.extended.empty()))
{
shouldRemove = false;
}
}
}
auto [ticked, bShouldRemove] = source.DoWork(this, source.source->Singular() ? source.source->GetHandle() : source.source->GetHandles()[0]);
if (!shouldRemove && source.ConsiderTimeout(now))
{
shouldRemove = true;
@ -932,75 +1015,17 @@ namespace Aurora::Loop
triggeredCount++;
shouldRemove = true;
AU_LOCK_GUARD(const_cast<AuThreadPrimitives::SpinLock *>(&source.lock)) // this spinlock really does feel like a hack
auto [ticked, bShouldRemove] = source.DoWork(this, -1);
shouldRemove = bShouldRemove;
for (const auto &handler : source.callbacks.extended)
if (trigger)
{
try
{
shouldRemove &= handler->OnFinished(source.source, 0);
}
catch (...)
{
SysPushErrorCatch();
}
}
if (shouldRemove)
{
for (const auto &handler : source.callbacks.base)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
if (shouldRemove)
{
AU_LOCK_GUARD(this->sourceMutex_); // TODO...
for (const auto &handler : globalSubcribers_)
{
try
{
shouldRemove &= handler->OnFinished(source.source);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
if ((source.callbacks.base.empty()) &&
(source.callbacks.extended.empty()))
{
shouldRemove = false;
AuTryInsert(*trigger, source.source);
}
}
}
}
if (shouldRemove)
{
if (trigger)
{
AuTryInsert(*trigger, source.source);
}
}
else
{
if (source.ConsiderTimeout(now))
{
shouldRemove = true;
}
}
source.source->OnFinishSleep();
if (shouldRemove)

View File

@ -55,36 +55,18 @@ namespace Aurora::Loop
AuList<AuSPtr<ILoopSourceSubscriberEx>> extended;
};
struct ExtendeSourceInfo
struct ExtendedSourceInfo
{
inline ExtendeSourceInfo(AuSPtr<ILoopSourceEx> in) : source(in)
{}
ExtendedSourceInfo(const AuSPtr<ILoopSourceEx> &in);
AuSPtr<ILoopSourceEx> source;
AuSPtr<ILoopSource> sourceBase;
AuThreadPrimitives::SpinLock lock; // im too brain dead to think of a solution to the AddCallback thread safety issue, so i'm going to optimize Wait[...]s subscriber lookup with filtered ccaches in here, protected by this spinlock
AuUInt64 timeoutAbs;
SourceCallbacks callbacks;
bool ConsiderTimeout(AuUInt64 time) const
{
if ((timeoutAbs) && (time >= timeoutAbs))
{
for (const auto &handler : callbacks.extended)
{
try
{
handler->OnTimeout(sourceBase);
}
catch (...)
{
SysPushErrorCatch();
}
}
return true;
}
return false;
}
bool ConsiderTimeout(AuUInt64 time) const;
AuPair<bool, bool> DoWork(LoopQueue* queue, AuUInt handle);
};
bool IsValid();
@ -123,7 +105,7 @@ namespace Aurora::Loop
HANDLE hEvent_ {INVALID_HANDLE_VALUE};
HANDLE hDummy_ {INVALID_HANDLE_VALUE};
AuList<ExtendeSourceInfo> loopSourceExs_;
AuList<ExtendedSourceInfo> loopSourceExs_;
bool bLargeCommitQueueDirty {};
struct Iterator
@ -139,7 +121,6 @@ namespace Aurora::Loop
return this->base->loopSourceExs_.end();
}
inline void Start()
{
this->startingIndexAnd = 0;
@ -194,7 +175,7 @@ namespace Aurora::Loop
AuUInt startingIndexOr {};
AuUInt startingIndexAnd {};
AuToIterator_t<AuList<ExtendeSourceInfo>> itr;
AuToIterator_t<AuList<ExtendedSourceInfo>> itr;
};