/*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: Loop.cpp Date: 2021-9-21 Author: Reece ***/ #include #include "Loop.hpp" #include "ILoopSourceEx.hpp" #include namespace Aurora::IO::Loop { #if !defined(AURORA_IS_MODERNNT_DERIVED) AUKN_SYM AuSPtr NewLSWin32Source(bool) { return {}; } #endif #if !defined(AURORA_IS_XNU_DERIVED) AUKN_SYM AuSPtr NewLSAppleSource() { return {}; } #endif AUKN_SYM AuSPtr NewLSFile(const AuSPtr &pFileTransaction) { if (!pFileTransaction) { SysPushErrorArg(); return {}; } return pFileTransaction->NewLoopSource(); } AUKN_SYM AuSPtr NewStdIn() { return AuConsole::StdInBufferLoopSource(); } AUKN_SYM AuSPtr NewLSAsync(Async::WorkerPId_t workerPid) { if (!workerPid.pool) { return Async::GetAsyncApp()->WorkerToLoopSource(workerPid); } return workerPid.pool->WorkerToLoopSource(workerPid); } #if defined(AURORA_IS_MODERNNT_DERIVED) AuList> WaitMultipleOrObjects(const AuList> &objects, bool bZeroTick, AuUInt32 timeout); #endif AuList> WaitMultipleOrObjectsFallback(const AuList> &objects, AuUInt32 timeout, bool bZeroTick, bool &bTimeout); void ResetLoopSourceFalseAlarm(const AuSPtr &pLoopSource) { if (!pLoopSource) { return; } if (pLoopSource->GetType() == ELoopSource::eSourceWin32) { return; } if (auto pMutex = AuDynamicCast(pLoopSource)) { pMutex->Unlock(); return; } if (auto pEvent = AuDynamicCast(pLoopSource)) { pEvent->Set(); return; } if (auto pSemaphore = AuDynamicCast(pLoopSource)) { pSemaphore->AddOne(); return; } } AUKN_SYM bool WaitMultipleLoopSources(const AuList> &lsList, AuList> &signaled, bool bAny, AuOptionalEx optTimeoutMS) { signaled.clear(); AuList reverseList; if (lsList.empty()) { return true; } if (lsList.size() == 1) { auto pSource = lsList[0]; if (!pSource) { return false; } bool bStatus {}; if (optTimeoutMS) { bStatus = pSource->WaitOn(optTimeoutMS.value()); } else { bStatus = pSource->IsSignaled(); } if (bStatus) { signaled.push_back(pSource); } return bStatus; } AuUInt64 uTimeoutEnd = optTimeoutMS.has_value() && optTimeoutMS.value() ? AuTime::SteadyClockNS() + AuMSToNS(optTimeoutMS) : 0; bool bZeroTick { optTimeoutMS && optTimeoutMS.value() == 0 }; AU_DEBUG_MEMCRUNCH; signaled.reserve(lsList.size()); if (!bAny) { auto &entryZero = lsList[0]; if (!entryZero) { signaled.push_back({}); } if (entryZero) { bool bStatus {}; if (!optTimeoutMS || optTimeoutMS.value() == 0) { bStatus = entryZero->WaitOn(0); } else if (bZeroTick) { bStatus = entryZero->IsSignaled(); } else if (optTimeoutMS) { bStatus = entryZero->WaitOn(optTimeoutMS.value()); } if (!bStatus) { goto next; } else { reverseList.push_back(0); signaled.push_back(entryZero); } } if (lsList.size() > 1) { for (AU_ITERATE_N_TO_X(i, 1, lsList.size())) { AuUInt32 uTimeoutMS {}; if (uTimeoutEnd) { auto uStartTime = Time::SteadyClockNS(); if (uStartTime >= uTimeoutEnd) { #if 0 break; #else bZeroTick = true; #endif } uTimeoutMS = AuNSToMS(uTimeoutEnd - uStartTime); if (!uTimeoutMS) { #if 0 break; #else bZeroTick = true; #endif } } if (bZeroTick) { if (!lsList[i]->IsSignaled()) { break; } } else { if (!lsList[i]->WaitOn(uTimeoutMS)) { break; } } reverseList.push_back(i); signaled.push_back(lsList[i]); } } next: bool bReturnStatus { true }; if (signaled.size() != lsList.size()) { bReturnStatus = false; signaled.clear(); for (const auto &uIndex : reverseList) { ResetLoopSourceFalseAlarm(lsList[uIndex]); } } return bReturnStatus; } else { bool bTimedout {}; AuList> signalTemp; auto lsList2 = lsList; bool bAnyFound {}; auto DoTheThing = [&]() { for (auto itr = lsList2.begin(); itr != lsList2.end(); ) { auto pSource = *itr; auto eType = pSource->GetType(); if (eType == ELoopSource::eSourceMutex || eType == ELoopSource::eSourceSemaphore || eType == ELoopSource::eSourceEvent) { auto pSourceEx = AuDynamicCast(pSource); bAnyFound = true; if ((pSourceEx && pSourceEx->IsSignaledNoSpinIfUserland()) || (!pSourceEx && pSource->IsSignaled())) { signalTemp.push_back(pSource); itr = lsList2.erase(itr); } else { itr++; } } else { itr++; } } }; if (gRuntimeConfig.threadingConfig.bPlatformIsSMPProcessorOptimized && !bZeroTick) { AuThreadPrimitives::DoTryIf([&]() { DoTheThing(); return lsList2.size() != lsList.size() || !bAnyFound; }); if (bAnyFound) { DoTheThing(); } } else { DoTheThing(); } if (lsList2.size()) { bZeroTick |= bool(signalTemp.size()); AuUInt32 uTimeoutMS {}; if (uTimeoutEnd) { auto uStartTime = Time::SteadyClockNS(); if (uStartTime >= uTimeoutEnd) { bZeroTick = true; } uTimeoutMS = AuNSToMS(uTimeoutEnd - uStartTime); if (!uTimeoutMS) { bZeroTick = true; } } #if defined(AURORA_IS_MODERNNT_DERIVED) signaled = WaitMultipleOrObjects(lsList2, bZeroTick, uTimeoutMS); bTimedout = uTimeoutEnd && uTimeoutMS && !bZeroTick ? Time::SteadyClockNS() >= uTimeoutEnd : false; #else signaled = WaitMultipleOrObjectsFallback(lsList2, uTimeoutMS, bZeroTick, bTimedout); bTimedout &= !bZeroTick; #endif } signaled.insert(signaled.end(), signalTemp.begin(), signalTemp.end()); if (bTimedout) { return false; } else { return signaled.size(); } } } AuList> WaitMultipleOrObjectsFallback(const AuList> &objects, AuUInt32 timeout, bool bZeroTick, bool &bTimeout) { AuList> loopSourceExs; AuList> triggered; if (objects.empty()) { return {}; } bTimeout = false; auto pQueue = AuLoop::NewLoopQueue(); if (!pQueue) { return {}; } try { loopSourceExs.reserve(objects.size()); triggered.reserve(triggered.size()); for (const auto &source : objects) { if (!source) { continue; } if (!pQueue->SourceAdd(source)) { return {}; } if (source->GetType() == ELoopSource::eSourceWin32) { continue; } if (auto pLoopSourceEx = AuDynamicCast(source)) { if (!AuTryInsert(loopSourceExs, pLoopSourceEx)) { return {}; } } } } catch (...) { return {}; } auto pListener = AuMakeSharedThrow([&](const AuSPtr &source) { AU_DEBUG_MEMCRUNCH; triggered.push_back(source); return false; }); if (!pQueue->AddCallback(pListener)) { return {}; } for (const auto &source : loopSourceExs) { source->OnPresleep(); } if (bZeroTick) { (void)pQueue->PumpNonblocking(); } else { bTimeout = !pQueue->WaitAny(timeout); } for (AU_ITERATE_N(i, loopSourceExs.size())) { auto pLoopSource = loopSourceExs[i]; if (std::find(triggered.begin(), triggered.end(), pLoopSource) == triggered.end()) { auto eType = pLoopSource->GetType(); if (!bTimeout || eType == ELoopSource::eSourceMutex || eType == ELoopSource::eSourceSemaphore || eType == ELoopSource::eSourceEvent) { if (pLoopSource->IsSignaledNoSpinIfUserland()) { triggered.push_back(pLoopSource); } } } pLoopSource->OnFinishSleep(); } return triggered; } }