[+] IWorkItem::SetSchedByLoopSourceRepeating

[+] ETickType::eRerunAndNotifyRepeatingWaiters
[+] ETickType::eScheduleAndNotifyRepeatingWaiters
[*] ETickType::eRerun async work items of ANY io fences
This commit is contained in:
Reece Wilson 2024-02-27 11:03:42 +00:00
parent 1f143ae981
commit 4ef055f81e
5 changed files with 135 additions and 41 deletions

View File

@ -14,7 +14,9 @@ namespace Aurora::Async
( (
eFinished, eFinished,
eRerun, eRerun,
eRerunAndNotifyRepeatingWaiters,
eSchedule, eSchedule,
eScheduleAndNotifyRepeatingWaiters,
eFailed eFailed
)); ));
} }

View File

@ -16,7 +16,9 @@ namespace Aurora::Async
// do not schedule a loop-source more than once // do not schedule a loop-source more than once
// you may ILoopSource::WaitOn arbitrarily; however, you must not attach it to the threads loopqueue more than once. // you may ILoopSource::WaitOn arbitrarily; however, you must not attach it to the threads loopqueue more than once.
virtual AuSPtr<IWorkItem> SetSchedByLoopSource(const AuSPtr<IO::Loop::ILoopSource> &pLoopSource) = 0; virtual AuSPtr<IWorkItem> SetSchedByLoopSourceOnce(const AuSPtr<IO::Loop::ILoopSource> &pLoopSource) = 0;
virtual AuSPtr<IWorkItem> SetSchedByLoopSourceRepeating(const AuSPtr<IO::Loop::ILoopSource> &pLoopSource) = 0;
// ms = time relative to the current time in milliseconds // ms = time relative to the current time in milliseconds
virtual AuSPtr<IWorkItem> SetSchedTime(AuUInt32 ms) = 0; virtual AuSPtr<IWorkItem> SetSchedTime(AuUInt32 ms) = 0;

View File

@ -169,8 +169,26 @@ namespace Aurora::Async
return AU_SHARED_FROM_THIS; return AU_SHARED_FROM_THIS;
} }
AuSPtr<IWorkItem> WorkItem::SetSchedByLoopSource(const AuSPtr<IO::Loop::ILoopSource> &pLoopSource) AuSPtr<IWorkItem> WorkItem::SetSchedByLoopSourceRepeating(const AuSPtr<IO::Loop::ILoopSource> &pLoopSource)
{ {
if (this->pIOWatchLS)
{
this->Fail();
return {};
}
this->bIoRepeating = true;
this->pIOWatchLS = pLoopSource;
return AU_SHARED_FROM_THIS;
}
AuSPtr<IWorkItem> WorkItem::SetSchedByLoopSourceOnce(const AuSPtr<IO::Loop::ILoopSource> &pLoopSource)
{
if (this->pIOWatchLS)
{
this->Fail();
return {};
}
this->bIoRepeating = false;
this->pIOWatchLS = pLoopSource; this->pIOWatchLS = pLoopSource;
return AU_SHARED_FROM_THIS; return AU_SHARED_FROM_THIS;
} }
@ -199,14 +217,14 @@ namespace Aurora::Async
return AU_SHARED_FROM_THIS; return AU_SHARED_FROM_THIS;
} }
void WorkItem::DispatchEx(bool check) void WorkItem::DispatchEx(bool check, bool bIsIOTick)
{ {
AU_LOCK_GUARD(this->lock); AU_LOCK_GUARD(this->lock);
DispatchExLocked(check); DispatchExLocked(check);
} }
void WorkItem::DispatchExLocked(bool check) void WorkItem::DispatchExLocked(bool check, bool bIsIOTick)
{ {
if (check) if (check)
{ {
@ -221,36 +239,54 @@ namespace Aurora::Async
return; return;
} }
for (auto itr = waitOn_.begin(); itr != waitOn_.end(); ) for (auto itr = this->waitOn_.begin(); itr != this->waitOn_.end(); )
{ {
auto &waitable = *itr; auto &waitable = *itr;
if (!waitable->HasFinished()) if (AuStaticCast<WorkItem>(waitable)->HasFinishedRepeatable())
{
// noop
}
else if (waitable->HasFinished())
{
itr = this->waitOn_.erase(itr);
}
else
{ {
return; return;
} }
itr = waitOn_.erase(itr);
} }
this->dispatchPending_ = true; this->dispatchPending_ = true;
if (this->pIOWatchLS) if (this->ioTickCount)
{ {
if (!this->pIOWatchLS->IsSignaled()) // Bypass IO tick check / I owe you
}
else if (!bIsIOTick && this->pIOWatchLS)
{
if (this->pIOWatchLS &&
!this->pIOWatch && // should sched?
!this->pIOWatchLS->IsSignaled()) // fast path to avoid scheduling
{ {
// Try schedule outside of an IO tick
if (!Schedule()) if (!Schedule())
{ {
this->Fail(); this->Fail();
} }
return; return;
} }
else else if (!this->pIOWatchLS->IsSignaled()) // fast path to avoid bailing out
{ {
AuResetMember(this->pIOWatchLS); return;
} }
} }
if (bIsIOTick)
{
this->ioTickCount++;
}
if (Time::SteadyClockNS() < this->dispatchTimeNs_) if (Time::SteadyClockNS() < this->dispatchTimeNs_)
{ {
if (!Schedule()) if (!Schedule())
@ -270,6 +306,11 @@ namespace Aurora::Async
return; return;
} }
if (this->ioTickCount)
{
this->ioTickCount--;
}
SendOff(); SendOff();
} }
@ -331,6 +372,14 @@ namespace Aurora::Async
return; return;
} }
bool bRerun = (this->pIOWatchLS && this->bIoRepeating) ||
this->waitOn_.size();
if (bRerun)
{
info = ETickType::eScheduleAndNotifyRepeatingWaiters;
}
if (this->task_) if (this->task_)
{ {
try try
@ -395,6 +444,18 @@ namespace Aurora::Async
void WorkItem::RunAsyncLocked2(const IWorkItemHandler::ProcessInfo &info) void WorkItem::RunAsyncLocked2(const IWorkItemHandler::ProcessInfo &info)
{ {
if (info.type != ETickType::eScheduleAndNotifyRepeatingWaiters &&
info.type != ETickType::eRerunAndNotifyRepeatingWaiters)
{
if (auto pIOWatch = AuExchange(this->pIOWatch, {}))
{
pIOWatch->StopWatch();
}
AuResetMember(this->pIOWatchLS);
}
this->finished2 = false;
switch (info.type) switch (info.type)
{ {
@ -410,6 +471,7 @@ namespace Aurora::Async
break; break;
} }
case ETickType::eSchedule: case ETickType::eSchedule:
case ETickType::eScheduleAndNotifyRepeatingWaiters:
{ {
if (info.reschedMs) if (info.reschedMs)
{ {
@ -433,7 +495,7 @@ namespace Aurora::Async
} }
else if (info.pLoopSource) else if (info.pLoopSource)
{ {
SetSchedByLoopSource(info.pLoopSource); SetSchedByLoopSourceOnce(info.pLoopSource);
} }
if (!WaitForLocked(info.waitFor)) if (!WaitForLocked(info.waitFor))
@ -443,9 +505,17 @@ namespace Aurora::Async
} }
[[fallthrough]]; [[fallthrough]];
case ETickType::eRerun: case ETickType::eRerun:
case ETickType::eRerunAndNotifyRepeatingWaiters:
{ {
DispatchExLocked(false); DispatchEx(false);
return;
if (info.type == ETickType::eRerun ||
info.type == ETickType::eSchedule)
{
return;
}
this->finished2 = true;
} }
case ETickType::eFailed: case ETickType::eFailed:
{ {
@ -454,28 +524,26 @@ namespace Aurora::Async
} }
} }
this->finished = true; this->dispatchPending_ = false;
if (this->finishedEvent_) this->finished = !this->finished2;
{
this->finishedEvent_->Set(); this->finishedEvent_->Set();
}
for (auto &waiter : AuExchange(this->waiters_, {})) for (auto &waiter : AuExchange(this->waiters_, {}))
{ {
AuReinterpretCast<WorkItem>(waiter)->DispatchExLocked(true); AuReinterpretCast<WorkItem>(waiter)->DispatchEx(true);
} }
this->waitOn_.clear(); if (this->finished2)
if (auto pIOWatch = AuExchange(this->pIOWatch, {}))
{ {
pIOWatch->StopWatch(); this->finished2 = false;
this->finishedEvent_->Reset();
}
else
{
this->Cleanup();
} }
this->Cleanup();
AuResetMember(this->pIOWatchLS);
} }
void WorkItem::Fail() void WorkItem::Fail()
@ -569,12 +637,17 @@ namespace Aurora::Async
AuUnsafeRaiiToShared(&waitProxy), AuUnsafeRaiiToShared(&waitProxy),
0 /*forever*/); 0 /*forever*/);
} }
bool WorkItem::HasFinished() bool WorkItem::HasFinished()
{ {
return this->finished; return this->finished;
} }
bool WorkItem::HasFinishedRepeatable()
{
return this->finished2;
}
void WorkItem::Cancel() void WorkItem::Cancel()
{ {
AU_LOCK_GUARD(this->lock2); AU_LOCK_GUARD(this->lock2);
@ -606,17 +679,29 @@ namespace Aurora::Async
{ {
return false; return false;
} }
AU_DEBUG_MEMCRUNCH;
AuWPtr<WorkItem> wptr = this->SharedFromThis();
this->pIOWatch = pIOProcessor->StartSimpleLSWatch(pLoopSource, AuMakeSharedThrow<AuIO::IIOSimpleEventListenerFunctional>([=]() this->pIOWatch = pIOProcessor->StartSimpleLSWatchEx(pLoopSource, AuMakeSharedThrow<AuIO::IIOSimpleEventListenerFunctional>([=]()
{ {
this->Dispatch(); if (auto pThat = AuTryLockMemoryType(wptr))
{
pThat->DispatchEx(false, true);
}
}, [=]() }, [=]()
{ {
this->Dispatch(); if (auto pThat = AuTryLockMemoryType(wptr))
{
pThat->DispatchEx(false, true);
}
}, [=]() }, [=]()
{ {
this->Dispatch(); if (auto pThat = AuTryLockMemoryType(wptr))
})); {
pThat->DispatchEx(false, true);
}
}), !this->bIoRepeating);
if (!this->pIOWatch) if (!this->pIOWatch)
{ {

View File

@ -33,13 +33,15 @@ namespace Aurora::Async
AuSPtr<IWorkItem> SetSchedTimeAbs(AuUInt32 ms) override; AuSPtr<IWorkItem> SetSchedTimeAbs(AuUInt32 ms) override;
AuSPtr<IWorkItem> SetSchedTimeNsAbs(AuUInt64 ns) override; AuSPtr<IWorkItem> SetSchedTimeNsAbs(AuUInt64 ns) override;
AuSPtr<IWorkItem> SetSchedSteadyTimeNsAbs(AuUInt64 ns) override; AuSPtr<IWorkItem> SetSchedSteadyTimeNsAbs(AuUInt64 ns) override;
AuSPtr<IWorkItem> SetSchedByLoopSource(const AuSPtr<IO::Loop::ILoopSource> &pLoopSource) override; AuSPtr<IWorkItem> SetSchedByLoopSourceOnce(const AuSPtr<IO::Loop::ILoopSource> &pLoopSource) override;
AuSPtr<IWorkItem> SetSchedByLoopSourceRepeating(const AuSPtr<IO::Loop::ILoopSource> &pLoopSource) override;
AuSPtr<IWorkItem> Then(const AuSPtr<IWorkItem> &next) override; AuSPtr<IWorkItem> Then(const AuSPtr<IWorkItem> &next) override;
AuSPtr<IWorkItem> Dispatch() override; AuSPtr<IWorkItem> Dispatch() override;
bool BlockUntilComplete() override; bool BlockUntilComplete() override;
bool HasFinished() override; bool HasFinished() override;
bool HasFinishedRepeatable();
bool HasFailed() override; bool HasFailed() override;
void Cancel() override; void Cancel() override;
@ -67,8 +69,8 @@ namespace Aurora::Async
void RunAsyncLocked2(const IWorkItemHandler::ProcessInfo &info); void RunAsyncLocked2(const IWorkItemHandler::ProcessInfo &info);
bool WaitForLocked(const AuList<AuSPtr<IWorkItem>> &workItem); bool WaitForLocked(const AuList<AuSPtr<IWorkItem>> &workItem);
void DispatchEx(bool check); void DispatchEx(bool check, bool bIsIOTick = false);
void DispatchExLocked(bool check); void DispatchExLocked(bool check, bool bIsIOTick = false);
IThreadPoolInternal *owner_ {}; IThreadPoolInternal *owner_ {};
AuSPtr<ThreadState> GetState(); AuSPtr<ThreadState> GetState();
@ -87,8 +89,11 @@ namespace Aurora::Async
AuSPtr<AuIO::Loop::ILoopSource> pIOWatchLS; AuSPtr<AuIO::Loop::ILoopSource> pIOWatchLS;
bool finished {}; bool finished {};
bool finished2 {};
bool failed {}; bool failed {};
bool bIoRepeating {};
bool dispatchPending_ {}; bool dispatchPending_ {};
AuUInt32 ioTickCount {};
AuUInt64 dispatchTimeNs_ {}; AuUInt64 dispatchTimeNs_ {};
AuUInt64 delayTimeNs_ {}; AuUInt64 delayTimeNs_ {};

View File

@ -294,7 +294,7 @@ namespace Aurora::IO::CompletionGroup
return {}; return {};
} }
pRet->SetSchedByLoopSource(pEvent)->Dispatch(); pRet->SetSchedByLoopSourceOnce(pEvent)->Dispatch();
} }
else else
{ {
@ -304,7 +304,7 @@ namespace Aurora::IO::CompletionGroup
return {}; return {};
} }
pRet->SetSchedByLoopSource(pEvent)->Dispatch(); pRet->SetSchedByLoopSourceOnce(pEvent)->Dispatch();
this->bNoAny = true; this->bNoAny = true;
} }
@ -334,7 +334,7 @@ namespace Aurora::IO::CompletionGroup
return {}; return {};
} }
pRet->SetSchedByLoopSource(this->anyProbablyAlwaysPresentEvent.GetLoopSource())->Dispatch(); pRet->SetSchedByLoopSourceRepeating(this->anyProbablyAlwaysPresentEvent.GetLoopSource())->Dispatch();
this->pAnyBarrier = pRet; this->pAnyBarrier = pRet;
return pRet; return pRet;
} }