/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: IOProcessor.cpp Date: 2022-6-6 Author: Reece ***/ #include #include #include "IOProcessorItem.hpp" #include "IOProcessorItems.hpp" #include "IOProcessor.hpp" #include "IOPipeProcessor.hpp" #include "Loop/Loop.hpp" #include "Loop/LoopQueue.hpp" namespace Aurora::IO { IOProcessor::IOProcessor(AuUInt threadId, bool tickOnly, AuAsync::WorkerPId_t worker, const AuSPtr &loop) : mutliplexIOAndTimer(!tickOnly), loopQueue(loop), asyncWorker(worker), threadId(threadId), streamProcessors(this) { } IOProcessor::~IOProcessor() { this->ReleaseAllWatches(); } bool IOProcessor::Init() { if (!this->ToQueue()) { return {}; } if (!this->items.Init()) { SysPushErrorNested(); return false; } if (!this->timers.Init(this, AuLoop::NewLSTimer(0))) { SysPushErrorNested(); return false; } this->ToQueue()->SourceAdd(this->items.cvEvent); return true; } bool IOProcessor::QueueIOEvent(const AuSPtr &ioEvent) { return this->items.AddFrameOrFallback(AuStaticCast(ioEvent)); } AuUInt32 IOProcessor::TryTick() { if (!CheckThread()) { SysPushErrorGeneric("Wrong Thread"); return 0; } AU_LOCK_GUARD(this->items.mutex); FrameStart(); FrameRunThreadIO(); FrameRunCheckLSes(); bool bHasAlertedItems = this->items.workSignaled.size() || this->timers.nbTicker.HasPassed(); if (!bHasAlertedItems) { ReportState(EIOProcessorEventStage::eFrameEndOfFrame); return 0; } return FrameRunEpilogue(); } bool IOProcessor::TickForRegister(const AuSPtr &ioEvent) { return this->items.AddFrameOrFallback(AuStaticCast(ioEvent)); } void IOProcessor::TickForHack(const AuSPtr &ioEvent) { SysAssert(this->TickForRegister(ioEvent)); if (AuExchange(this->bScheduled_, true)) { return; } AuStaticCast(this->loopQueue)->AddHook([that = AuSharedFromThis()]() { that->RunTick(); }); } AuUInt32 IOProcessor::TickFor(const AuSPtr &ioEvent) { SysAssert(TickForRegister(ioEvent)); if (IsTickOnly()) { return 0; } if (this->bFrameStart) { return 0; } if (!CheckThread()) { SysPushErrorGeneric("Wrong Thread"); return 0; } AU_LOCK_GUARD(this->items.mutex); FrameStart(); FrameRunThreadIO(); FrameRunCheckLSes(); return FrameRunEpilogue(); } AuUInt32 IOProcessor::RunTick() { if (!CheckThread()) { SysPushErrorGeneric("Wrong Thread"); AuThreading::Sleep(1); return 0; } this->bScheduled_ = false; this->bFrameStart = false; AU_LOCK_GUARD(this->items.mutex); FrameStart(); FrameWaitForAny(0); FramePumpWaitingBlocked(); FrameRunThreadIO(); FrameRunCheckLSes(); return FrameRunEpilogue(); } AuUInt32 IOProcessor::ManualTick() { if (!CheckThread()) { SysPushErrorGeneric("Wrong Thread"); return 0; } this->bFrameStart = false; AU_LOCK_GUARD(this->items.mutex); FrameStart(); FrameRunThreadIO(); FrameRunCheckLSes(); return FrameRunEpilogue(); } void IOProcessor::FrameRunIOWorkUnits() { AU_LOCK_GUARD(this->items.mutex); while (this->workUnits.size()) { for (auto workItems : AuExchange(this->workUnits, {})) { workItems->OnRun(); } } } AuUInt IOProcessor::FrameRunEpilogue() { FrameRunAlerted(); FrameRunAlertedSniffers(); FrameRunIOWorkUnits(); FrameEndOfFrameEvents(); return FrameFinalize(); } void IOProcessor::FrameRunCheckLSes() { if (!this->IsTickOnly()) { return; } for (auto &item : this->items.allItems) { if (item->item->IsRunOnSelfIO() && item->item->IsRunOnSelfIOCheckedOnTimerTick()) { auto ls = item->item->GetSelfIOSource(); if (ls && ls->IsSignaled()) { // Should deadlock without critical sections this->QueueIOEvent(item); } } } } void IOProcessor::FrameRunAlerted() { for (auto &a : this->items.onTickReceivers) { if (a->listener) { try { a->listener->Tick_RunOnTick(); if (!AuExists(this->items.workSignaled, a) && !AuExists(this->items.onTickReceivers, a)) { a->listener->Tick_Any(); } } catch (...) { SysPushErrorCatch(); } } SysAssert(AuTryInsert(this->items.finalizeQueue, a)); } for (auto &a : this->items.workSignaled) { if (a->listener) { try { a->listener->Tick_SelfIOEvent(); a->listener->Tick_Any(); } catch (...) { SysPushErrorCatch(); } } SysAssert(AuTryInsert(this->items.finalizeQueue, a)); } } void IOProcessor::FrameRunAlertedSniffers() { if (this->items.workSignaled.empty()) { return; } for (auto &a : this->items.onOtherReceivers) { if (AuExists(this->items.workSignaled, a)) { continue; } if (a->listener) { try { a->listener->Tick_OtherIOEvent(); a->listener->Tick_Any(); } catch (...) { SysPushErrorCatch(); SysAssert(a->FailWatch()); } } this->items.finalizeQueue.push_back(a); } } void IOProcessor::FrameEndOfFrameEvents() { ReportState(EIOProcessorEventStage::eFrameWorkDone); for (auto &pumped : this->items.finalizeQueue) { if (!pumped->listener) { continue; } try { pumped->listener->Tick_FrameEpilogue(); } catch (...) { SysPushErrorCatch(); pumped->FailWatch(); } } auto hasRemoved = this->items.crossThreadAbort.size(); for (auto itr = this->items.crossThreadAbort.begin(); itr != this->items.crossThreadAbort.end(); ) { bool fatal = itr->second; auto item = itr->first; this->ClearProcessor(item, fatal); itr = this->items.crossThreadAbort.erase(itr); } if (hasRemoved) { ToQueue()->Commit(); } } void IOProcessor::ClearProcessor(const AuSPtr &processor, bool fatal) { if (!AuTryRemove(this->items.allItems, processor)) { return; } try { if (fatal) { processor->listener->OnFailureCompletion(); } else { processor->listener->OnNominalCompletion(); } } catch (...) { SysPushErrorCatch(); } auto item = processor->item; if (item->IsRunOnTick()) { if (!AuTryRemove(this->items.onTickReceivers, processor)) { SysPushErrorMem(); } } if (item->IsRunOnOtherTick()) { if (!AuTryRemove(this->items.onOtherReceivers, processor)) { SysPushErrorMem(); } } if (item->IsRunOnSelfIO()) { if (!AuTryRemove(this->items.registeredIO, processor)) { SysPushErrorMem(); } auto src = item->GetSelfIOSource(); if (!src) { SysPushErrorGeneric(); return; } if (!this->ToQueue()->SourceRemove(src)) { SysPushErrorNested("IO Remove Error [!!!]"); } } } AuUInt IOProcessor::FrameFinalize() { auto C = this->items.finalizeQueue.size(); this->items.workSignaled.clear(); this->items.finalizeQueue.clear(); ReportState(EIOProcessorEventStage::eFrameEndOfFrame); this->bFrameStart = false; return C; } bool IOProcessor::InternalIsTickReady() { // TODO: tickless io ad-hoc event-as-condvar? return this->items.workSignaled.size() || this->items.crossThreadAbort.size(); } bool IOProcessor::FrameWaitForAny(AuUInt32 msMax) { if (InternalIsTickReady()) { return true; } if (this->IsTickOnly()) { auto next = this->timers.nbTicker.nextTriggerTime; auto now = AuTime::CurrentInternalClockNS(); if (now >= next) { return true; } auto delay = AuNSToMS(next - now); #if 0 if (delay == 1) { bool ret {}; while (IO::IOYield()); } else #endif { return IO::WaitFor(msMax ? AuMin(msMax, delay) : delay, true); } } else { return this->loopQueue->WaitAny(msMax); } } bool IOProcessor::SubmitIOWorkItem(const AuSPtr &work) { AU_LOCK_GUARD(this->items.mutex); // < critical section / reentrant | can nest submission return AuTryInsert(this->workUnits, work); } bool IOProcessor::AddEventListener(const AuSPtr &eventListener) { AU_LOCK_GUARD(this->listenersSpinLock); return AuTryInsert(this->listeners, eventListener); } void IOProcessor::RemoveEventListener(const AuSPtr &eventListener) { AU_LOCK_GUARD(this->listenersSpinLock); AuTryRemove(this->listeners, eventListener); } void IOProcessor::DispatchFrame(ProcessInfo &info) { this->ManualTick(); auto ns = this->refreshRateNs; if (ns) { info = AuAsync::ETickType::eSchedule; info.reschedNs = ns; } } void IOProcessor::FramePumpWaitingBlocked() { auto blocked = this->items.GetBlockedSignals(); if (blocked.size()) { this->items.workSignaled.insert(this->items.workSignaled.end(), blocked.begin(), blocked.end()); } } void IOProcessor::FrameStart() { ReportState(EIOProcessorEventStage::eFrameStartOfFrame); this->bFrameStart = true; FramePumpWaitingBlocked(); } void IOProcessor::FrameRunThreadIO() { ReportState(EIOProcessorEventStage::eFrameYieldIO); while (IO::IOYield()); ReportState(EIOProcessorEventStage::eFrameIODone); } AuUInt64 IOProcessor::SetRefreshRate(AuUInt64 ns) { if (!CheckThread()) { AU_THROW_STRING("Wrong Thread"); } bool changed = bool(ns) != bool(this->refreshRateNs); auto old = AuExchange(this->refreshRateNs, ns); UpdateTimers(); if (changed) { if (ns) { AddTimer(); } else { RemoveTimer(); } } return old; } bool IOProcessor::HasRefreshRate() { return bool(this->refreshRateNs); } bool IOProcessor::MultiplexRefreshRateWithIOEvents() { return this->mutliplexIOAndTimer; } AuUInt64 IOProcessor::GetOwnedThreadId() { return this->threadId; } void IOProcessor::UpdateTimers() { this->timers.lsTicker->UpdateTickRateIfAnyNs(this->refreshRateNs); this->timers.nbTicker.nsTimeStep = this->refreshRateNs; if (!this->timers.nbTicker.nextTriggerTime) { this->timers.nbTicker.nextTriggerTime = AuTime::CurrentInternalClockNS() + this->timers.nbTicker.nsTimeStep; } } void IOProcessor::AddTimer() { if (IsAsync()) { StartAsyncTimerIfAny(); } else { AddTimerLS(); } } void IOProcessor::AddTimerLS() { this->ToQueue()->SourceAdd(this->timers.lsTicker); this->ToQueue()->AddCallback(this->timers.lsTicker, AuSPtr(AuSharedFromThis(), &this->timers)); this->ToQueue()->Commit(); } void IOProcessor::StartAsyncTimerIfAny() { if (!this->workItem) { this->workItem = this->asyncWorker.pool->NewWorkItem(this->asyncWorker, AuSharedFromThis()); } this->workItem->SetSchedTimeNs(this->refreshRateNs); this->workItem->Dispatch(); } void IOProcessor::RemoveTimer() { if (IsAsync()) { CancelWorkItem(); } else { RemoveLSTimer(); } } void IOProcessor::CancelWorkItem() { if (!this->workItem) { return; } this->workItem->Cancel(); this->workItem.reset(); } void IOProcessor::RemoveLSTimer() { auto queue = this->ToQueue(); if (!queue) { return; } queue->SourceRemove(this->timers.lsTicker); } bool IOProcessor::IsAsync() { return bool(this->asyncWorker.pool); } bool IOProcessor::IsTickOnly() { return !this->mutliplexIOAndTimer && this->timers.nbTicker.nextTriggerTime; } bool IOProcessor::RequestRemovalForItemFromAnyThread(const AuSPtr &processor) { return {}; } AuSPtr IOProcessor::StartIOWatchEx(const AuSPtr& object, const AuSPtr& listener, bool singleshot) { if (!CheckThread()) { AU_THROW_STRING("Wrong Thread"); } auto item = AuMakeShared(); item->parent = this; item->listener = listener; item->item = object; item->singleshot = singleshot; AU_LOCK_GUARD(this->items.mutex); this->items.allItems.push_back(item); if (object->IsRunOnTick()) { if (!AuTryInsert(this->items.onTickReceivers, item)) { SysPushErrorMem(); return {}; } } if (object->IsRunOnOtherTick()) { if (!AuTryInsert(this->items.onOtherReceivers, item)) { SysPushErrorMem(); return {}; } } if (object->IsRunOnSelfIO()) { auto src = object->GetSelfIOSource(); if (!src) { SysPushErrorGeneric(); return {}; } auto timeout = object->IOTimeoutInMS(); if (timeout) { if (!this->ToQueue()->SourceAddWithTimeout(src, timeout)) { SysPushErrorNested("IO Error"); return {}; } } else { if (!this->ToQueue()->SourceAdd(src)) { SysPushErrorNested("IO Error"); return {}; } } if (!this->ToQueue()->AddCallbackEx(src, item)) { SysPushErrorNested("IO Error"); SysAssert(this->ToQueue()->SourceRemove(src)); return {}; } if (!this->ToQueue()->Commit()) { SysPushErrorIO("midframe recommit"); } if (!AuTryInsert(this->items.registeredIO, item)) { SysPushErrorMem(); SysAssert(this->ToQueue()->SourceRemove(src)); return {}; } } return item; } AuSPtr IOProcessor::StartSimpleIOWatchEx(const AuSPtr& object, const AuSPtr& listener, bool singleshot) { auto adapter = DesimplifyIOEventListenerAdapter(listener); if (!adapter) { SysPushErrorMem(); return {}; } return this->StartIOWatchEx(object, adapter, singleshot); } AuSPtr IOProcessor::StartSimpleLSWatchEx(const AuSPtr& source, const AuSPtr& listener, bool singleshot, AuUInt32 msTimeout) { auto sourceAdapter = NewWaitableLoopSourceEx(source, msTimeout); if (!sourceAdapter) { SysPushErrorMem(); return {}; } return this->StartSimpleIOWatchEx(sourceAdapter, listener, singleshot); } AuSPtr IOProcessor::StartIOWatch(const AuSPtr &object, const AuSPtr &listener) { return StartIOWatchEx(object, listener, false); } AuSPtr IOProcessor::StartSimpleIOWatch(const AuSPtr &object, const AuSPtr &listener) { return this->StartSimpleIOWatchEx(object, listener, false); } AuSPtr IOProcessor::StartSimpleLSWatch(const AuSPtr &source, const AuSPtr &listener) { return this->StartSimpleLSWatchEx(source, listener, false, 0); } AuSPtr IOProcessor::ToPipeProcessor() { if (!CheckThread()) { AU_THROW_STRING("Wrong Thread"); } return AuSPtr(AuSharedFromThis(), &this->streamProcessors); } AuSPtr IOProcessor::ToQueue() { return this->loopQueue; } void IOProcessor::ReleaseAllWatches() { RemoveTimer(); auto queue = ToQueue(); if (queue) { if (this->items.cvEvent) { queue->SourceRemove(this->items.cvEvent); } for (auto &io : this->items.registeredIO) { queue->SourceRemove(io->item->GetSelfIOSource()); } queue->Commit(); } } bool IOProcessor::HasItems() { return bool(this->items.allItems.size()); } void IOProcessor::ReportState(EIOProcessorEventStage stage) { AU_LOCK_GUARD(this->listenersSpinLock); for (auto &listeners : this->listeners) { try { listeners->OnIOProcessorStateEvent(stage); } catch (...) { SysPushErrorCatch(); } } } bool IOProcessor::CheckThread() { if (this->threadId == 0) { this->threadId = AuThreads::GetThreadId(); return true; } return this->threadId == AuThreads::GetThreadId(); } AUKN_SYM AuSPtr NewIOProcessor(bool tickOnly, const AuSPtr &queue) { auto processor = AuMakeShared(0, tickOnly, AuAsync::WorkerPId_t {}, queue); if (!processor) { SysPushErrorMem(); return {}; } if (!processor->Init()) { SysPushErrorNested(); return {}; } return processor; } AUKN_SYM AuSPtr NewIOProcessorOnThread(bool tickOnly, Async::WorkerPId_t id) { if (!id.pool) { return {}; } auto thread = id.pool->ResolveHandle(id); if (!thread) { SysPushErrorGeneric("Worker PID failed to resolve to a thread object"); return {}; } auto queue = id.pool->ToKernelWorkQueue(id); if (!queue) { SysPushErrorGeneric("Worker PID has no kernel work queue"); return {}; } auto processor = AuMakeShared(AuUInt(thread.get()), tickOnly, id, queue); if (!processor) { SysPushErrorMem(); return {}; } if (!processor->Init()) { SysPushErrorNested(); return {}; } return processor; } AUKN_SYM AuSPtr NewIOProcessorNoQueue(bool tickOnly) { auto loop = AuLoop::NewLoopQueue(); if (!loop) { SysPushErrorMem(); return {}; } return NewIOProcessor(tickOnly, loop); } }