/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuIOProcessor.cpp Date: 2022-6-6 Author: Reece ***/ #include #include #include "AuIOProcessorItem.hpp" #include "AuIOProcessorItems.hpp" #include "AuIOProcessor.hpp" #include "AuIOPipeProcessor.hpp" #include "Loop/Loop.hpp" #include "Loop/LoopQueue.hpp" #include namespace Aurora::IO { IOProcessor::IOProcessor(AuUInt threadId, bool bTickOnly, AuOptionalEx worker, const AuSPtr &pLoopQueue, bool bIsNoQueue) : mutliplexIOAndTimer(!bTickOnly), pLoopQueue(pLoopQueue), asyncWorker(worker), threadId(threadId), streamProcessors(this), bIsNoQueue(bIsNoQueue) { } IOProcessor::~IOProcessor() { this->ReleaseAllWatches(); this->items.Deinit(); } bool IOProcessor::Init() { auto pQueue = this->ToQueue(); if (!pQueue) { return {}; } if (!this->items.Init()) { SysPushErrorNested(); return false; } auto pTimer = AuLoop::NewLSTimer(0); if (!pTimer) { SysPushErrorNested(); return false; } if (!this->timers.Init(this, pTimer)) { SysPushErrorNested(); return false; } if (!pQueue->SourceAdd(this->items.cvEvent)) { SysPushErrorNested(); return false; } if (!this->bIsNoQueue) { auto pAS = AuMakeShared([pThat = AuSharedFromThis()](const AuSPtr &pSource) -> bool { pThat->ManualTick(); return false; }); if (!pQueue->AddCallback(this->items.cvEvent, pAS)) { return false; } } (void)pQueue->Commit(); if (asyncWorker) { AuAtomicAdd(&AuStaticCast(asyncWorker.value().GetPool())->uAtomicIOProcessors, 1u); } return true; } bool IOProcessor::QueueIOEvent(const AuSPtr &ioEvent) { return this->items.AddFrameOrFallback(AuStaticCast(ioEvent)); } AuUInt32 IOProcessor::TryTick() { if (!CheckThread()) { SysPushErrorGeneric("Wrong Thread"); return 0; } AU_LOCK_GUARD(this->items.mutex); FrameStart(); FrameRunThreadIO(); FrameRunCheckLSes(); bool bHasAlertedItems = this->items.workSignaled.size() || this->timers.nbTicker.HasPassed(); if (!bHasAlertedItems) { ReportState(EIOProcessorEventStage::eFrameEndOfFrame); return 0; } return FrameRunEpilogue(); } bool IOProcessor::TickForRegister(const AuSPtr &ioEvent) { return this->items.AddFrameOrFallback(AuStaticCast(ioEvent)); } void IOProcessor::TickForHack(const AuSPtr &ioEvent) { SysAssert(this->TickForRegister(ioEvent)); if (AuExchange(this->bScheduled_, true)) { return; } AuStaticCast(this->pLoopQueue)->AddHook([that = AuSharedFromThis()]() { that->RunTick(); }); } AuUInt32 IOProcessor::TickFor(const AuSPtr &ioEvent) { SysAssert(TickForRegister(ioEvent)); if (IsTickOnly()) { return 0; } if (this->bFrameStart && !this->bNoDeferredTick) { return 0; } if (!CheckThread()) { SysPushErrorGeneric("Wrong Thread"); return 0; } AU_LOCK_GUARD(this->items.mutex); FrameStart(); FrameRunThreadIO(); FrameRunCheckLSes(); return FrameRunEpilogue(); } AuUInt32 IOProcessor::RunTick() { if (!CheckThread()) { SysPushErrorGeneric("Wrong Thread"); AuIO::IOSleep(1); return 0; } this->bScheduled_ = false; this->bFrameStart = false; AU_LOCK_GUARD(this->items.mutex); FrameStart(); FrameWaitForAny(0); FramePumpWaitingBlocked(); FrameRunThreadIO(); FrameRunCheckLSes(); return FrameRunEpilogue(); } AuUInt32 IOProcessor::RunTickEx(AuUInt32 dwTimeout) { if (!CheckThread()) { SysPushErrorGeneric("Wrong Thread"); AuIO::IOSleep(1); return 0; } this->bScheduled_ = false; this->bFrameStart = false; AU_LOCK_GUARD(this->items.mutex); FrameStart(); FrameWaitForAny(dwTimeout); FramePumpWaitingBlocked(); FrameRunThreadIO(); FrameRunCheckLSes(); return FrameRunEpilogue(); } AuUInt32 IOProcessor::ManualTick() { if (!CheckThread()) { SysPushErrorGeneric("Wrong Thread"); return 0; } this->bFrameStart = false; AU_LOCK_GUARD(this->items.mutex); FrameStart(); FrameRunThreadIO(); FrameRunCheckLSes(); return FrameRunEpilogue(); } void IOProcessor::FrameRunIOWorkUnits() { AU_LOCK_GUARD(this->items.mutex); while (this->workUnits.size()) { for (auto workItems : AuExchange(this->workUnits, {})) { workItems->OnRun(); } } } AuUInt IOProcessor::FrameRunEpilogue() { FrameRunAlerted(); FrameRunAlertedSniffers(); FrameRunIOWorkUnits(); IO::SendBatched(); FrameEndOfFrameEvents(); return FrameFinalize(); } void IOProcessor::FrameRunCheckLSes() { if (!this->IsTickOnly()) { return; } for (auto &item : this->items.allItems) { if (item->pItem->IsRunOnSelfIO() && item->pItem->IsRunOnSelfIOCheckedOnTimerTick()) { auto ls = item->pItem->GetSelfIOSource(); if (ls && ls->IsSignaled()) { // Should deadlock without critical sections this->QueueIOEvent(item); } } } } void IOProcessor::FrameRunAlerted() { for (const auto &a : this->items.onTickReceivers) { if (a->pListener) { try { a->pListener->Tick_RunOnTick(); if (!AuExists(this->items.workSignaled, a) && !AuExists(this->items.onTickReceivers, a)) { a->pListener->Tick_Any(); } } catch (...) { SysPushErrorCatch(); } } SysAssert(AuTryInsert(this->items.finalizeQueue, AuConstReference(a))); } for (const auto &a : this->items.workSignaled) { if (a->pListener) { try { a->pListener->Tick_SelfIOEvent(); a->pListener->Tick_Any(); } catch (...) { SysPushErrorCatch(); } } SysAssert(AuTryInsert(this->items.finalizeQueue, AuConstReference(a))); } } void IOProcessor::FrameRunAlertedSniffers() { if (this->items.workSignaled.empty()) { return; } for (auto &a : this->items.onOtherReceivers) { if (AuExists(this->items.workSignaled, a)) { continue; } if (a->pListener) { try { a->pListener->Tick_OtherIOEvent(); a->pListener->Tick_Any(); } catch (...) { SysPushErrorCatch(); SysAssert(a->FailWatch()); } } this->items.finalizeQueue.push_back(a); } } void IOProcessor::FrameEndOfFrameEvents() { ReportState(EIOProcessorEventStage::eFrameWorkDone); for (auto &pumped : this->items.finalizeQueue) { if (!pumped->pListener) { continue; } try { pumped->pListener->Tick_FrameEpilogue(); } catch (...) { SysPushErrorCatch(); pumped->FailWatch(); } } auto hasRemoved = this->items.crossThreadAbort.size(); for (auto itr = this->items.crossThreadAbort.begin(); itr != this->items.crossThreadAbort.end(); ) { bool fatal = itr->second; auto item = itr->first; this->ClearProcessor(item, fatal); itr = this->items.crossThreadAbort.erase(itr); } if (hasRemoved) { ToQueue()->Commit(); } } void IOProcessor::ClearProcessor(const AuSPtr &processor, bool fatal) { if (!AuTryRemove(this->items.allItems, processor)) { return; } try { if (processor->pListener) { if (fatal) { processor->pListener->OnFailureCompletion(); } else { processor->pListener->OnNominalCompletion(); } } } catch (...) { SysPushErrorCatch(); } auto item = processor->pItem; if (item->IsRunOnTick()) { if (!AuTryRemove(this->items.onTickReceivers, processor)) { SysPushErrorMem(); } } if (item->IsRunOnOtherTick()) { if (!AuTryRemove(this->items.onOtherReceivers, processor)) { SysPushErrorMem(); } } if (item->IsRunOnSelfIO()) { if (!AuTryRemove(this->items.registeredIO, processor)) { SysPushErrorMem(); } auto src = item->GetSelfIOSource(); if (!src) { SysPushErrorGeneric(); return; } if (!this->ToQueue()->SourceRemove(src)) { SysPushErrorNested("IO Remove Error [!!!]"); } } processor->Finalize(); } AuUInt IOProcessor::FrameFinalize() { auto C = this->items.finalizeQueue.size(); this->items.workSignaled.clear(); this->items.finalizeQueue.clear(); ReportState(EIOProcessorEventStage::eFrameEndOfFrame); this->bFrameStart = false; return C; } bool IOProcessor::InternalIsTickReady() { // TODO: tickless io ad-hoc event-as-condvar? return this->items.workSignaled.size() || this->items.crossThreadAbort.size() || this->workUnits.size(); } bool IOProcessor::FrameWaitForAny(AuUInt32 msMax) { if (InternalIsTickReady()) { return true; } if (this->IsTickOnly()) { auto next = this->timers.nbTicker.nextTriggerTime; auto now = AuTime::SteadyClockNS(); if (now >= next) { return true; } auto delay = AuNSToMS(next - now); #if 0 if (delay == 1) { bool ret {}; while (IO::IOYield()); } else #endif { return IO::WaitFor(msMax ? AuMin(msMax, delay) : delay, true); } } else { return this->pLoopQueue->WaitAny(msMax); } } bool IOProcessor::SubmitIOWorkItem(const AuSPtr &work) { AU_LOCK_GUARD(this->items.mutex); // < critical section / reentrant | can nest submission this->items.cvEvent->Set(); return AuTryInsert(this->workUnits, work); } void IOProcessor::WakeupThread() { this->items.cvEvent->Set(); } bool IOProcessor::AddEventListener(const AuSPtr &eventListener) { AU_LOCK_GUARD(this->listenersSpinLock->AsWritable()); return AuTryInsert(this->listeners, eventListener); } void IOProcessor::RemoveEventListener(const AuSPtr &eventListener) { AU_LOCK_GUARD(this->listenersSpinLock->AsWritable()); AuTryRemove(this->listeners, eventListener); } void IOProcessor::DispatchFrame(ProcessInfo &info) { this->ManualTick(); auto ns = this->refreshRateNs; if (ns) { info = AuAsync::ETickType::eSchedule; info.reschedNs = ns; } } void IOProcessor::FramePumpWaitingBlocked() { auto blocked = this->items.GetBlockedSignals(); if (blocked.size()) { this->items.workSignaled.insert(this->items.workSignaled.end(), blocked.begin(), blocked.end()); } } void IOProcessor::FrameStart() { ReportState(EIOProcessorEventStage::eFrameStartOfFrame); this->bFrameStart = true; FramePumpWaitingBlocked(); } void IOProcessor::FrameRunThreadIO() { ReportState(EIOProcessorEventStage::eFrameYieldIO); while (IO::IOYield()); ReportState(EIOProcessorEventStage::eFrameIODone); } AuUInt64 IOProcessor::SetRefreshRate(AuUInt64 ns) { if (!CheckThread()) { AU_THROW_STRING("Wrong Thread"); } bool bBinaryCanged = bool(ns) != bool(this->refreshRateNs); auto old = AuExchange(this->refreshRateNs, ns); UpdateTimers(); if (bBinaryCanged) { if (ns) { AddTimer(); } else { RemoveTimer(); } } return old; } bool IOProcessor::HasRefreshRate() { return bool(this->refreshRateNs); } bool IOProcessor::MultiplexRefreshRateWithIOEvents() { return this->mutliplexIOAndTimer; } AuUInt64 IOProcessor::GetOwnedThreadId() { return this->threadId; } void IOProcessor::UpdateTimers() { this->timers.pLsTicker->UpdateTickRateIfAnyNs(this->refreshRateNs); this->timers.nbTicker.nsTimeStep = this->refreshRateNs; if (!this->timers.nbTicker.nextTriggerTime) { this->timers.nbTicker.nextTriggerTime = AuTime::SteadyClockNS() + this->timers.nbTicker.nsTimeStep; } } void IOProcessor::AddTimer() { if (IsAsync()) { StartAsyncTimerIfAny(); } else { AddTimerLS(); } } void IOProcessor::AddTimerLS() { if (auto optWorker = this->asyncWorker) { AuAtomicAdd(&AuStaticCast(optWorker.value().GetPool())->uAtomicIOProcessorsWorthlessSources, 1u); } this->ToQueue()->SourceAdd(this->timers.pLsTicker); this->ToQueue()->AddCallback(this->timers.pLsTicker, AuSPtr(AuSharedFromThis(), &this->timers)); this->ToQueue()->Commit(); } void IOProcessor::StartAsyncTimerIfAny() { if (!this->pWorkItem) { this->pWorkItem = this->asyncWorker.value().GetPool()->NewWorkItem(this->asyncWorker.value(), AuSharedFromThis()); } this->pWorkItem->SetSchedTimeNs(this->refreshRateNs); this->pWorkItem->Dispatch(); } void IOProcessor::RemoveTimer() { if (IsAsync()) { CancelWorkItem(); } else { RemoveLSTimer(); } } void IOProcessor::CancelWorkItem() { if (!this->pWorkItem) { return; } this->pWorkItem->Cancel(); this->pWorkItem.reset(); } void IOProcessor::RemoveLSTimer() { auto queue = this->ToQueue(); if (!queue) { return; } queue->SourceRemove(this->timers.pLsTicker); if (auto optWorker = this->asyncWorker) { AuAtomicSub(&AuStaticCast(optWorker.value().GetPool())->uAtomicIOProcessorsWorthlessSources, 1u); } } bool IOProcessor::IsAsync() { return bool(this->asyncWorker); } bool IOProcessor::IsTickOnly() { return !this->mutliplexIOAndTimer && this->timers.nbTicker.nextTriggerTime; } bool IOProcessor::RequestRemovalForItemFromAnyThread(const AuSPtr &pProcessor) { return {}; } AuSPtr IOProcessor::StartIOWatchEx(const AuSPtr &pItem, const AuSPtr &pListener, bool bSingleshot) { auto item = AuMakeShared(); if (!item) { SysPushErrorMemory(); return {}; } item->pParent = this; item->pListener = pListener; item->pItem = pItem; item->bSingleshot = bSingleshot; AU_LOCK_GUARD(this->items.mutex); if (!AuTryInsert(this->items.allItems, AuConstReference(item))) { return {}; } if (pItem->IsRunOnTick()) { if (!AuTryInsert(this->items.onTickReceivers, AuConstReference(item))) { SysPushErrorMemory(); return {}; } } if (pItem->IsRunOnOtherTick()) { if (!AuTryInsert(this->items.onOtherReceivers, AuConstReference(item))) { SysPushErrorMemory(); return {}; } } if (pItem->IsRunOnSelfIO()) { auto src = pItem->GetSelfIOSource(); if (!src) { SysPushErrorGeneric(); return {}; } auto timeout = pItem->IOTimeoutInMS(); if (timeout) { if (!this->ToQueue()->SourceAddWithTimeout(src, timeout)) { SysPushErrorNested("IO Error"); return {}; } } else { if (!this->ToQueue()->SourceAdd(src)) { SysPushErrorNested("IO Error"); return {}; } } if (!this->ToQueue()->AddCallbackEx(src, item)) { SysPushErrorNested("IO Error"); SysAssert(this->ToQueue()->SourceRemove(src)); return {}; } if (!this->ToQueue()->Commit()) { SysPushErrorIO("midframe recommit"); } if (!AuTryInsert(this->items.registeredIO, AuConstReference(item))) { SysPushErrorMem(); SysAssert(this->ToQueue()->SourceRemove(src)); return {}; } } if (pItem->CanRequestTick()) { pItem->OnReportPumper(item); } return item; } AuSPtr IOProcessor::StartSimpleIOWatchEx(const AuSPtr& object, const AuSPtr& listener, bool singleshot) { if (!listener) { return this->StartIOWatchEx(object, {}, singleshot); } auto adapter = DesimplifyIOEventListenerAdapter(listener); if (!adapter) { SysPushErrorMem(); return {}; } return this->StartIOWatchEx(object, adapter, singleshot); } AuSPtr IOProcessor::StartSimpleLSWatchEx(const AuSPtr& source, const AuSPtr& listener, bool singleshot, AuUInt32 msTimeout) { auto sourceAdapter = NewWaitableLoopSourceEx(source, msTimeout); if (!sourceAdapter) { SysPushErrorMem(); return {}; } return this->StartSimpleIOWatchEx(sourceAdapter, listener, singleshot); } AuSPtr IOProcessor::StartIOWatch(const AuSPtr &object, const AuSPtr &listener) { return StartIOWatchEx(object, listener, false); } AuSPtr IOProcessor::StartSimpleIOWatch(const AuSPtr &object, const AuSPtr &listener) { return this->StartSimpleIOWatchEx(object, listener, false); } AuSPtr IOProcessor::StartSimpleLSWatch(const AuSPtr &source, const AuSPtr &listener) { return this->StartSimpleLSWatchEx(source, listener, false, 0); } AuSPtr IOProcessor::ToPipeProcessor() { if (!CheckThread()) { AU_THROW_STRING("Wrong Thread"); } return AuSPtr(AuSharedFromThis(), &this->streamProcessors); } AuSPtr IOProcessor::ToQueue() { return this->pLoopQueue; } bool IOProcessor::ConfigureNoDeferredTicks(bool bOption) { return AuExchange(this->bNoDeferredTick, bOption); } void IOProcessor::ReleaseAllWatches() { if (auto optWorker = this->asyncWorker) { AuAtomicSub(&AuStaticCast(optWorker.value().GetPool())->uAtomicIOProcessors, 1u); } RemoveTimer(); auto queue = ToQueue(); if (queue) { if (this->items.cvEvent) { queue->SourceRemove(this->items.cvEvent); } for (auto &io : this->items.registeredIO) { queue->SourceRemove(io->pItem->GetSelfIOSource()); } queue->Commit(); } } bool IOProcessor::HasItems() { bool bVal = bool(this->items.allItems.size()) || bool(this->workUnits.size()) || this->InternalIsTickReady(); return bVal; } void IOProcessor::ReportState(EIOProcessorEventStage stage) { AU_LOCK_GUARD(this->listenersSpinLock->AsReadable()); for (auto &listeners : this->listeners) { try { listeners->OnIOProcessorStateEvent(stage); } catch (...) { SysPushErrorCatch(); } } } bool IOProcessor::CheckThread() { if (this->threadId == 0) { this->threadId = AuThreads::GetThreadId(); return true; } return this->threadId == AuThreads::GetThreadId(); } static AuSPtr NewIOProcessorEx(bool tickOnly, const AuSPtr &queue, bool bIsNoQueue) { auto processor = AuMakeShared(0, tickOnly, AuOptionalEx {}, queue, bIsNoQueue); SysCheckNotNullMemory(processor, {}); if (!processor->Init()) { SysPushErrorNested(); return {}; } return processor; } AUKN_SYM AuSPtr NewIOProcessor(bool tickOnly, const AuSPtr &queue) { auto processor = AuMakeShared(0, tickOnly, AuOptionalEx {}, queue, false); SysCheckNotNullMemory(processor, {}); if (!processor->Init()) { SysPushErrorNested(); return {}; } return processor; } AUKN_SYM AuSPtr NewIOProcessorOnThread(bool tickOnly, AuAsync::WorkerPId_t id) { if (!id.GetPool()) { SysPushErrorArg(); return {}; } auto thread = id.GetPool()->ResolveHandle(id); if (!thread) { SysPushErrorGeneric("Worker PID failed to resolve to a thread object"); return {}; } auto queue = id.GetPool()->ToKernelWorkQueue(id); if (!queue) { SysPushErrorGeneric("Worker PID has no kernel work queue"); return {}; } auto processor = AuMakeShared(AuUInt(thread.get()), tickOnly, id, queue, false); SysCheckNotNullMemory(processor, {}); if (!processor->Init()) { SysPushErrorNested(); return {}; } return processor; } AUKN_SYM AuSPtr NewIOProcessorNoQueue(bool tickOnly) { auto loop = AuLoop::NewLoopQueue(); SysCheckNotNull(loop, {}); return NewIOProcessorEx(tickOnly, loop, true); } }