AuroraRuntime/Source/IO/AuIOProcessor.cpp

1025 lines
26 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuIOProcessor.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "AuIOProcessorItem.hpp"
#include "AuIOProcessorItems.hpp"
#include "AuIOProcessor.hpp"
#include "AuIOPipeProcessor.hpp"
#include "Loop/Loop.hpp"
#include "Loop/LoopQueue.hpp"
#include <Async/ThreadPool.hpp>
namespace Aurora::IO
{
IOProcessor::IOProcessor(AuUInt threadId,
bool bTickOnly,
AuOptionalEx<AuAsync::WorkerPId_t> worker,
const AuSPtr<AuLoop::ILoopQueue> &pLoopQueue,
bool bIsNoQueue) :
mutliplexIOAndTimer(!bTickOnly),
pLoopQueue(pLoopQueue),
asyncWorker(worker),
threadId(threadId),
streamProcessors(this),
bIsNoQueue(bIsNoQueue)
{
}
IOProcessor::~IOProcessor()
{
this->ReleaseAllWatches();
this->items.Deinit();
}
bool IOProcessor::Init()
{
auto pQueue = this->ToQueue();
if (!pQueue)
{
return {};
}
if (!this->items.Init())
{
SysPushErrorNested();
return false;
}
auto pTimer = AuLoop::NewLSTimer(0);
if (!pTimer)
{
SysPushErrorNested();
return false;
}
if (!this->timers.Init(this, pTimer))
{
SysPushErrorNested();
return false;
}
if (!pQueue->SourceAdd(this->items.cvEvent))
{
SysPushErrorNested();
return false;
}
if (!this->bIsNoQueue)
{
auto pAS = AuMakeShared<AuLoop::ILoopSourceSubscriberFunctional>([pThat = AuSharedFromThis()](const AuSPtr<AuLoop::ILoopSource> &pSource) -> bool
{
pThat->ManualTick();
return false;
});
if (!pQueue->AddCallback(this->items.cvEvent, pAS))
{
return false;
}
}
(void)pQueue->Commit();
if (asyncWorker)
{
AuAtomicAdd(&AuStaticCast<AuAsync::ThreadPool>(asyncWorker.value().GetPool())->uAtomicIOProcessors, 1u);
}
return true;
}
bool IOProcessor::QueueIOEvent(const AuSPtr<IIOProcessorItem> &ioEvent)
{
return this->items.AddFrameOrFallback(AuStaticCast<IOProcessorItem>(ioEvent));
}
AuUInt32 IOProcessor::TryTick()
{
if (!CheckThread())
{
SysPushErrorGeneric("Wrong Thread");
return 0;
}
AU_LOCK_GUARD(this->items.mutex);
FrameStart();
FrameRunThreadIO();
FrameRunCheckLSes();
bool bHasAlertedItems = this->items.workSignaled.size() ||
this->timers.nbTicker.HasPassed();
if (!bHasAlertedItems)
{
ReportState(EIOProcessorEventStage::eFrameEndOfFrame);
return 0;
}
return FrameRunEpilogue();
}
bool IOProcessor::TickForRegister(const AuSPtr<IIOProcessorItem> &ioEvent)
{
return this->items.AddFrameOrFallback(AuStaticCast<IOProcessorItem>(ioEvent));
}
void IOProcessor::TickForHack(const AuSPtr<IIOProcessorItem> &ioEvent)
{
SysAssert(this->TickForRegister(ioEvent));
if (AuExchange(this->bScheduled_, true))
{
return;
}
AuStaticCast<Loop::LoopQueue>(this->pLoopQueue)->AddHook([that = AuSharedFromThis()]()
{
that->RunTick();
});
}
AuUInt32 IOProcessor::TickFor(const AuSPtr<IIOProcessorItem> &ioEvent)
{
SysAssert(TickForRegister(ioEvent));
if (IsTickOnly())
{
return 0;
}
if (this->bFrameStart &&
!this->bNoDeferredTick)
{
return 0;
}
if (!CheckThread())
{
SysPushErrorGeneric("Wrong Thread");
return 0;
}
AU_LOCK_GUARD(this->items.mutex);
FrameStart();
FrameRunThreadIO();
FrameRunCheckLSes();
return FrameRunEpilogue();
}
AuUInt32 IOProcessor::RunTick()
{
if (!CheckThread())
{
SysPushErrorGeneric("Wrong Thread");
AuIO::IOSleep(1);
return 0;
}
this->bScheduled_ = false;
this->bFrameStart = false;
AU_LOCK_GUARD(this->items.mutex);
FrameStart();
FrameWaitForAny(0);
FramePumpWaitingBlocked();
FrameRunThreadIO();
FrameRunCheckLSes();
return FrameRunEpilogue();
}
AuUInt32 IOProcessor::RunTickEx(AuUInt32 dwTimeout)
{
if (!CheckThread())
{
SysPushErrorGeneric("Wrong Thread");
AuIO::IOSleep(1);
return 0;
}
this->bScheduled_ = false;
this->bFrameStart = false;
AU_LOCK_GUARD(this->items.mutex);
FrameStart();
FrameWaitForAny(dwTimeout);
FramePumpWaitingBlocked();
FrameRunThreadIO();
FrameRunCheckLSes();
return FrameRunEpilogue();
}
AuUInt32 IOProcessor::ManualTick()
{
if (!CheckThread())
{
SysPushErrorGeneric("Wrong Thread");
return 0;
}
this->bFrameStart = false;
AU_LOCK_GUARD(this->items.mutex);
FrameStart();
FrameRunThreadIO();
FrameRunCheckLSes();
return FrameRunEpilogue();
}
void IOProcessor::FrameRunIOWorkUnits()
{
AU_LOCK_GUARD(this->items.mutex);
while (this->workUnits.size())
{
for (auto workItems : AuExchange(this->workUnits, {}))
{
workItems->OnRun();
}
}
}
AuUInt IOProcessor::FrameRunEpilogue()
{
FrameRunAlerted();
FrameRunAlertedSniffers();
FrameRunIOWorkUnits();
IO::SendBatched();
FrameEndOfFrameEvents();
return FrameFinalize();
}
void IOProcessor::FrameRunCheckLSes()
{
if (!this->IsTickOnly())
{
return;
}
for (auto &item : this->items.allItems)
{
if (item->pItem->IsRunOnSelfIO() && item->pItem->IsRunOnSelfIOCheckedOnTimerTick())
{
auto ls = item->pItem->GetSelfIOSource();
if (ls && ls->IsSignaled())
{
// Should deadlock without critical sections
this->QueueIOEvent(item);
}
}
}
}
void IOProcessor::FrameRunAlerted()
{
for (const auto &a : this->items.onTickReceivers)
{
if (a->pListener)
{
try
{
a->pListener->Tick_RunOnTick();
if (!AuExists(this->items.workSignaled, a) &&
!AuExists(this->items.onTickReceivers, a))
{
a->pListener->Tick_Any();
}
}
catch (...)
{
SysPushErrorCatch();
}
}
SysAssert(AuTryInsert(this->items.finalizeQueue, AuConstReference(a)));
}
for (const auto &a : this->items.workSignaled)
{
if (a->pListener)
{
try
{
a->pListener->Tick_SelfIOEvent();
a->pListener->Tick_Any();
}
catch (...)
{
SysPushErrorCatch();
}
}
SysAssert(AuTryInsert(this->items.finalizeQueue, AuConstReference(a)));
}
}
void IOProcessor::FrameRunAlertedSniffers()
{
if (this->items.workSignaled.empty())
{
return;
}
for (auto &a : this->items.onOtherReceivers)
{
if (AuExists(this->items.workSignaled, a))
{
continue;
}
if (a->pListener)
{
try
{
a->pListener->Tick_OtherIOEvent();
a->pListener->Tick_Any();
}
catch (...)
{
SysPushErrorCatch();
SysAssert(a->FailWatch());
}
}
this->items.finalizeQueue.push_back(a);
}
}
void IOProcessor::FrameEndOfFrameEvents()
{
ReportState(EIOProcessorEventStage::eFrameWorkDone);
for (auto &pumped : this->items.finalizeQueue)
{
if (!pumped->pListener)
{
continue;
}
try
{
pumped->pListener->Tick_FrameEpilogue();
}
catch (...)
{
SysPushErrorCatch();
pumped->FailWatch();
}
}
auto hasRemoved = this->items.crossThreadAbort.size();
for (auto itr = this->items.crossThreadAbort.begin(); itr != this->items.crossThreadAbort.end(); )
{
bool fatal = itr->second;
auto item = itr->first;
this->ClearProcessor(item, fatal);
itr = this->items.crossThreadAbort.erase(itr);
}
if (hasRemoved)
{
ToQueue()->Commit();
}
}
void IOProcessor::ClearProcessor(const AuSPtr<IOProcessorItem> &processor, bool fatal)
{
if (!AuTryRemove(this->items.allItems, processor))
{
return;
}
try
{
if (processor->pListener)
{
if (fatal)
{
processor->pListener->OnFailureCompletion();
}
else
{
processor->pListener->OnNominalCompletion();
}
}
}
catch (...)
{
SysPushErrorCatch();
}
auto item = processor->pItem;
if (item->IsRunOnTick())
{
if (!AuTryRemove(this->items.onTickReceivers, processor))
{
SysPushErrorMem();
}
}
if (item->IsRunOnOtherTick())
{
if (!AuTryRemove(this->items.onOtherReceivers, processor))
{
SysPushErrorMem();
}
}
if (item->IsRunOnSelfIO())
{
if (!AuTryRemove(this->items.registeredIO, processor))
{
SysPushErrorMem();
}
auto src = item->GetSelfIOSource();
if (!src)
{
SysPushErrorGeneric();
return;
}
if (!this->ToQueue()->SourceRemove(src))
{
SysPushErrorNested("IO Remove Error [!!!]");
}
}
processor->Finalize();
}
AuUInt IOProcessor::FrameFinalize()
{
auto C = this->items.finalizeQueue.size();
this->items.workSignaled.clear();
this->items.finalizeQueue.clear();
ReportState(EIOProcessorEventStage::eFrameEndOfFrame);
this->bFrameStart = false;
return C;
}
bool IOProcessor::InternalIsTickReady()
{
// TODO: tickless io ad-hoc event-as-condvar?
return this->items.workSignaled.size() ||
this->items.crossThreadAbort.size() ||
this->workUnits.size();
}
bool IOProcessor::FrameWaitForAny(AuUInt32 msMax)
{
if (InternalIsTickReady())
{
return true;
}
if (this->IsTickOnly())
{
auto next = this->timers.nbTicker.nextTriggerTime;
auto now = AuTime::SteadyClockNS();
if (now >= next)
{
return true;
}
auto delay = AuNSToMS<AuUInt>(next - now);
#if 0
if (delay == 1)
{
bool ret {};
while (IO::IOYield());
}
else
#endif
{
return IO::WaitFor(msMax ? AuMin<AuUInt32>(msMax, delay) : delay, true);
}
}
else
{
return this->pLoopQueue->WaitAny(msMax);
}
}
bool IOProcessor::SubmitIOWorkItem(const AuSPtr<IIOProcessorWorkUnit> &work)
{
AU_LOCK_GUARD(this->items.mutex); // < critical section / reentrant | can nest submission
this->items.cvEvent->Set();
return AuTryInsert(this->workUnits, work);
}
void IOProcessor::WakeupThread()
{
this->items.cvEvent->Set();
if (this->asyncWorker)
{
auto worker = this->asyncWorker.Value();
worker.GetPool()->Wakeup(worker);
}
}
bool IOProcessor::AddEventListener(const AuSPtr<IIOProcessorEventListener> &eventListener)
{
AU_LOCK_GUARD(this->listenersSpinLock->AsWritable());
return AuTryInsert(this->listeners, eventListener);
}
void IOProcessor::RemoveEventListener(const AuSPtr<IIOProcessorEventListener> &eventListener)
{
AU_LOCK_GUARD(this->listenersSpinLock->AsWritable());
AuTryRemove(this->listeners, eventListener);
}
void IOProcessor::DispatchFrame(ProcessInfo &info)
{
this->ManualTick();
auto ns = this->refreshRateNs;
if (ns)
{
info = AuAsync::ETickType::eSchedule;
info.reschedNs = ns;
}
}
void IOProcessor::FramePumpWaitingBlocked()
{
auto blocked = this->items.GetBlockedSignals();
if (blocked.size())
{
this->items.workSignaled.insert(this->items.workSignaled.end(), blocked.begin(), blocked.end());
}
}
void IOProcessor::FrameStart()
{
ReportState(EIOProcessorEventStage::eFrameStartOfFrame);
this->bFrameStart = true;
FramePumpWaitingBlocked();
}
void IOProcessor::FrameRunThreadIO()
{
ReportState(EIOProcessorEventStage::eFrameYieldIO);
while (IO::IOYield());
ReportState(EIOProcessorEventStage::eFrameIODone);
}
AuUInt64 IOProcessor::SetRefreshRate(AuUInt64 ns)
{
if (!CheckThread())
{
AU_THROW_STRING("Wrong Thread");
}
bool bBinaryCanged = bool(ns) != bool(this->refreshRateNs);
auto old = AuExchange(this->refreshRateNs, ns);
UpdateTimers();
if (bBinaryCanged)
{
if (ns)
{
AddTimer();
}
else
{
RemoveTimer();
}
}
return old;
}
bool IOProcessor::HasRefreshRate()
{
return bool(this->refreshRateNs);
}
bool IOProcessor::MultiplexRefreshRateWithIOEvents()
{
return this->mutliplexIOAndTimer;
}
AuUInt64 IOProcessor::GetOwnedThreadId()
{
return this->threadId;
}
void IOProcessor::UpdateTimers()
{
this->timers.pLsTicker->UpdateTickRateIfAnyNs(this->refreshRateNs);
this->timers.nbTicker.nsTimeStep = this->refreshRateNs;
if (!this->timers.nbTicker.nextTriggerTime)
{
this->timers.nbTicker.nextTriggerTime = AuTime::SteadyClockNS() + this->timers.nbTicker.nsTimeStep;
}
}
void IOProcessor::AddTimer()
{
if (IsAsync())
{
StartAsyncTimerIfAny();
}
else
{
AddTimerLS();
}
}
void IOProcessor::AddTimerLS()
{
if (auto optWorker = this->asyncWorker)
{
AuAtomicAdd(&AuStaticCast<AuAsync::ThreadPool>(optWorker.value().GetPool())->uAtomicIOProcessorsWorthlessSources, 1u);
}
this->ToQueue()->SourceAdd(this->timers.pLsTicker);
this->ToQueue()->AddCallback(this->timers.pLsTicker, AuSPtr<AuLoop::ILoopSourceSubscriber>(AuSharedFromThis(), &this->timers));
this->ToQueue()->Commit();
}
void IOProcessor::StartAsyncTimerIfAny()
{
if (!this->pWorkItem)
{
this->pWorkItem = this->asyncWorker.value().GetPool()->NewWorkItem(this->asyncWorker.value(), AuSharedFromThis());
}
this->pWorkItem->SetSchedTimeNs(this->refreshRateNs);
this->pWorkItem->Dispatch();
}
void IOProcessor::RemoveTimer()
{
if (IsAsync())
{
CancelWorkItem();
}
else
{
RemoveLSTimer();
}
}
void IOProcessor::CancelWorkItem()
{
if (!this->pWorkItem)
{
return;
}
this->pWorkItem->Cancel();
this->pWorkItem.reset();
}
void IOProcessor::RemoveLSTimer()
{
auto queue = this->ToQueue();
if (!queue)
{
return;
}
queue->SourceRemove(this->timers.pLsTicker);
if (auto optWorker = this->asyncWorker)
{
AuAtomicSub(&AuStaticCast<AuAsync::ThreadPool>(optWorker.value().GetPool())->uAtomicIOProcessorsWorthlessSources, 1u);
}
}
bool IOProcessor::IsAsync()
{
return bool(this->asyncWorker);
}
bool IOProcessor::IsTickOnly()
{
return !this->mutliplexIOAndTimer &&
this->timers.nbTicker.nextTriggerTime;
}
bool IOProcessor::RequestRemovalForItemFromAnyThread(const AuSPtr<IIOProcessorItem> &pProcessor)
{
return {};
}
AuSPtr<IIOProcessorItem> IOProcessor::StartIOWatchEx(const AuSPtr<IIOWaitableItem> &pItem,
const AuSPtr<IIOEventListener> &pListener,
bool bSingleshot)
{
auto item = AuMakeShared<IOProcessorItem>();
if (!item)
{
SysPushErrorMemory();
return {};
}
item->pParent = this;
item->pListener = pListener;
item->pItem = pItem;
item->bSingleshot = bSingleshot;
AU_LOCK_GUARD(this->items.mutex);
if (!AuTryInsert(this->items.allItems, AuConstReference(item)))
{
return {};
}
if (pItem->IsRunOnTick())
{
if (!AuTryInsert(this->items.onTickReceivers, AuConstReference(item)))
{
SysPushErrorMemory();
return {};
}
}
if (pItem->IsRunOnOtherTick())
{
if (!AuTryInsert(this->items.onOtherReceivers, AuConstReference(item)))
{
SysPushErrorMemory();
return {};
}
}
if (pItem->IsRunOnSelfIO())
{
auto src = pItem->GetSelfIOSource();
if (!src)
{
SysPushErrorGeneric();
return {};
}
auto timeout = pItem->IOTimeoutInMS();
if (timeout)
{
if (!this->ToQueue()->SourceAddWithTimeout(src, timeout))
{
SysPushErrorNested("IO Error");
return {};
}
}
else
{
if (!this->ToQueue()->SourceAdd(src))
{
SysPushErrorNested("IO Error");
return {};
}
}
if (!this->ToQueue()->AddCallbackEx(src, item))
{
SysPushErrorNested("IO Error");
SysAssert(this->ToQueue()->SourceRemove(src));
return {};
}
if (!this->ToQueue()->Commit())
{
SysPushErrorIO("midframe recommit");
}
if (!AuTryInsert(this->items.registeredIO, AuConstReference(item)))
{
SysPushErrorMem();
SysAssert(this->ToQueue()->SourceRemove(src));
return {};
}
}
if (pItem->CanRequestTick())
{
pItem->OnReportPumper(item);
}
return item;
}
AuSPtr<IIOProcessorItem> IOProcessor::StartSimpleIOWatchEx(const AuSPtr<IIOWaitableItem>& object, const AuSPtr<IIOSimpleEventListener>& listener, bool singleshot)
{
if (!listener)
{
return this->StartIOWatchEx(object, {}, singleshot);
}
auto adapter = DesimplifyIOEventListenerAdapter(listener);
if (!adapter)
{
SysPushErrorMem();
return {};
}
return this->StartIOWatchEx(object, adapter, singleshot);
}
AuSPtr<IIOProcessorItem> IOProcessor::StartSimpleLSWatchEx(const AuSPtr<Loop::ILoopSource>& source, const AuSPtr<IIOSimpleEventListener>& listener, bool singleshot, AuUInt32 msTimeout)
{
auto sourceAdapter = NewWaitableLoopSourceEx(source, msTimeout);
if (!sourceAdapter)
{
SysPushErrorMem();
return {};
}
return this->StartSimpleIOWatchEx(sourceAdapter, listener, singleshot);
}
AuSPtr<IIOProcessorItem> IOProcessor::StartIOWatch(const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOEventListener> &listener)
{
return StartIOWatchEx(object, listener, false);
}
AuSPtr<IIOProcessorItem> IOProcessor::StartSimpleIOWatch(const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOSimpleEventListener> &listener)
{
return this->StartSimpleIOWatchEx(object, listener, false);
}
AuSPtr<IIOProcessorItem> IOProcessor::StartSimpleLSWatch(const AuSPtr<Loop::ILoopSource> &source, const AuSPtr<IIOSimpleEventListener> &listener)
{
return this->StartSimpleLSWatchEx(source, listener, false, 0);
}
AuSPtr<IIOPipeProcessor> IOProcessor::ToPipeProcessor()
{
if (!CheckThread())
{
AU_THROW_STRING("Wrong Thread");
}
return AuSPtr<IIOPipeProcessor>(AuSharedFromThis(), &this->streamProcessors);
}
AuSPtr<AuLoop::ILoopQueue> IOProcessor::ToQueue()
{
return this->pLoopQueue;
}
bool IOProcessor::ConfigureNoDeferredTicks(bool bOption)
{
return AuExchange(this->bNoDeferredTick, bOption);
}
void IOProcessor::ReleaseAllWatches()
{
if (auto optWorker = this->asyncWorker)
{
AuAtomicSub(&AuStaticCast<AuAsync::ThreadPool>(optWorker.value().GetPool())->uAtomicIOProcessors, 1u);
}
RemoveTimer();
auto queue = ToQueue();
if (queue)
{
if (this->items.cvEvent)
{
queue->SourceRemove(this->items.cvEvent);
}
for (auto &io : this->items.registeredIO)
{
queue->SourceRemove(io->pItem->GetSelfIOSource());
}
queue->Commit();
}
}
bool IOProcessor::HasItems()
{
bool bVal = bool(this->items.allItems.size()) ||
bool(this->workUnits.size()) || this->InternalIsTickReady();
return bVal;
}
void IOProcessor::ReportState(EIOProcessorEventStage stage)
{
AU_LOCK_GUARD(this->listenersSpinLock->AsReadable());
for (auto &listeners : this->listeners)
{
try
{
listeners->OnIOProcessorStateEvent(stage);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
bool IOProcessor::CheckThread()
{
if (this->threadId == 0)
{
this->threadId = AuThreads::GetThreadId();
return true;
}
return this->threadId == AuThreads::GetThreadId();
}
static AuSPtr<IIOProcessor> NewIOProcessorEx(bool tickOnly, const AuSPtr<AuLoop::ILoopQueue> &queue, bool bIsNoQueue)
{
auto processor = AuMakeShared<IOProcessor>(0, tickOnly, AuOptionalEx<AuAsync::WorkerPId_t> {}, queue, bIsNoQueue);
SysCheckNotNullMemory(processor, {});
if (!processor->Init())
{
SysPushErrorNested();
return {};
}
return processor;
}
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessor(bool tickOnly, const AuSPtr<AuLoop::ILoopQueue> &queue)
{
auto processor = AuMakeShared<IOProcessor>(0, tickOnly, AuOptionalEx<AuAsync::WorkerPId_t> {}, queue, false);
SysCheckNotNullMemory(processor, {});
if (!processor->Init())
{
SysPushErrorNested();
return {};
}
return processor;
}
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessorOnThread(bool tickOnly, AuAsync::WorkerPId_t id)
{
if (!id.GetPool())
{
SysPushErrorArg();
return {};
}
auto thread = id.GetPool()->ResolveHandle(id);
if (!thread)
{
SysPushErrorGeneric("Worker PID failed to resolve to a thread object");
return {};
}
auto queue = id.GetPool()->ToKernelWorkQueue(id);
if (!queue)
{
SysPushErrorGeneric("Worker PID has no kernel work queue");
return {};
}
auto processor = AuMakeShared<IOProcessor>(AuUInt(thread.get()), tickOnly, id, queue, false);
SysCheckNotNullMemory(processor, {});
if (!processor->Init())
{
SysPushErrorNested();
return {};
}
return processor;
}
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessorNoQueue(bool tickOnly)
{
auto loop = AuLoop::NewLoopQueue();
SysCheckNotNull(loop, {});
return NewIOProcessorEx(tickOnly, loop, true);
}
}