AuroraRuntime/Source/IO/Loop/Loop.cpp

475 lines
13 KiB
C++

/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Loop.cpp
Date: 2021-9-21
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Loop.hpp"
#include "ILoopSourceEx.hpp"
#include <Source/Threading/Primitives/SMTYield.hpp>
namespace Aurora::IO::Loop
{
#if !defined(AURORA_IS_MODERNNT_DERIVED)
AUKN_SYM AuSPtr<ILoopSource> NewLSWin32Source(bool)
{
return {};
}
#endif
#if !defined(AURORA_IS_XNU_DERIVED)
AUKN_SYM AuSPtr<ILoopSource> NewLSAppleSource()
{
return {};
}
#endif
AUKN_SYM AuSPtr<ILoopSource> NewLSFile(const AuSPtr<AuIO::IAsyncTransaction> &pFileTransaction)
{
if (!pFileTransaction)
{
SysPushErrorArg();
return {};
}
return pFileTransaction->NewLoopSource();
}
AUKN_SYM AuSPtr<ILoopSource> NewStdIn()
{
return AuConsole::StdInBufferLoopSource();
}
AUKN_SYM AuSPtr<ILoopSource> NewLSAsync(Async::WorkerPId_t workerPid)
{
if (!workerPid.pool)
{
return Async::GetAsyncApp()->WorkerToLoopSource(workerPid);
}
return workerPid.pool->WorkerToLoopSource(workerPid);
}
#if defined(AURORA_IS_MODERNNT_DERIVED)
AuList<AuSPtr<ILoopSource>> WaitMultipleOrObjects(const AuList<AuSPtr<ILoopSource>> &objects, bool bZeroTick, AuUInt32 timeout);
#endif
AuList<AuSPtr<ILoopSource>> WaitMultipleOrObjectsFallback(const AuList<AuSPtr<ILoopSource>> &objects, AuUInt32 timeout, bool bZeroTick, bool &bTimeout);
void ResetLoopSourceFalseAlarm(const AuSPtr<Loop::ILoopSource> &pLoopSource)
{
if (!pLoopSource)
{
return;
}
if (pLoopSource->GetType() == ELoopSource::eSourceWin32)
{
return;
}
if (auto pMutex = AuDynamicCast<Loop::ILSMutex>(pLoopSource))
{
pMutex->Unlock();
return;
}
if (auto pEvent = AuDynamicCast<Loop::ILSEvent>(pLoopSource))
{
pEvent->Set();
return;
}
if (auto pSemaphore = AuDynamicCast<Loop::ILSSemaphore>(pLoopSource))
{
pSemaphore->AddOne();
return;
}
}
AUKN_SYM bool WaitMultipleLoopSources(const AuList<AuSPtr<Loop::ILoopSource>> &lsList,
AuList<AuSPtr<Loop::ILoopSource>> &signaled,
bool bAny,
AuOptionalEx<AuUInt32> optTimeoutMS)
{
signaled.clear();
AuList<AuUInt32> reverseList;
if (lsList.empty())
{
return true;
}
bool bZeroTick { optTimeoutMS && optTimeoutMS.value() == 0 };
bool bHasTimeOut { optTimeoutMS && optTimeoutMS.value() };
bool bSleepForever { !optTimeoutMS };
if (lsList.size() == 1)
{
auto pSource = lsList[0];
if (!pSource)
{
signaled.push_back({});
return true;
}
bool bStatus {};
if (bSleepForever)
{
bStatus = pSource->WaitOn(0);
}
else if (bHasTimeOut)
{
bStatus = pSource->WaitOn(optTimeoutMS.value());
}
else
{
bStatus = pSource->IsSignaled();
}
if (bStatus)
{
signaled.push_back(pSource);
}
return bStatus;
}
AuUInt64 uTimeoutEnd = bHasTimeOut ?
AuTime::SteadyClockNS() + AuMSToNS<AuUInt64>(optTimeoutMS.value()) :
0;
AU_DEBUG_MEMCRUNCH;
signaled.reserve(lsList.size());
if (!bAny)
{
auto &entryZero = lsList[0];
if (!entryZero)
{
signaled.push_back({});
}
if (entryZero)
{
bool bStatus {};
if (bSleepForever)
{
bStatus = entryZero->WaitOn(0);
}
else if (bHasTimeOut)
{
bStatus = entryZero->WaitOn(optTimeoutMS.value());
}
else
{
bStatus = entryZero->IsSignaled();
}
if (!bStatus)
{
goto next;
}
else
{
reverseList.push_back(0);
signaled.push_back(entryZero);
}
}
if (lsList.size() > 1)
{
for (AU_ITERATE_N_TO_X(i, 1, lsList.size()))
{
AuUInt32 uTimeoutMS {};
if (uTimeoutEnd)
{
auto uStartTime = Time::SteadyClockNS();
if (uStartTime >= uTimeoutEnd)
{
#if 0
break;
#else
bZeroTick = true;
#endif
}
uTimeoutMS = AuNSToMS<AuInt64>(uTimeoutEnd - uStartTime);
if (!uTimeoutMS)
{
#if 0
break;
#else
bZeroTick = true;
#endif
}
}
if (bZeroTick)
{
if (!lsList[i]->IsSignaled())
{
break;
}
}
else
{
if (!lsList[i]->WaitOn(uTimeoutMS))
{
break;
}
}
reverseList.push_back(i);
signaled.push_back(lsList[i]);
}
}
next:
bool bReturnStatus { true };
if (signaled.size() != lsList.size())
{
bReturnStatus = false;
signaled.clear();
for (const auto &uIndex : reverseList)
{
ResetLoopSourceFalseAlarm(lsList[uIndex]);
}
}
return bReturnStatus;
}
else
{
bool bTimedout {};
AuList<AuSPtr<Loop::ILoopSource>> signalTemp;
auto lsList2 = lsList;
bool bAnyFound {};
auto DoTheThing = [&](bool bLastTick)
{
for (auto itr = lsList2.begin();
itr != lsList2.end();
)
{
auto pSource = *itr;
if (!pSource)
{
signalTemp.push_back({});
itr = lsList2.erase(itr);
continue;
}
auto eType = pSource->GetType();
if (eType == ELoopSource::eSourceMutex ||
eType == ELoopSource::eSourceSemaphore ||
eType == ELoopSource::eSourceEvent)
{
auto pSourceEx = AuDynamicCast<Loop::ILoopSourceEx>(pSource);
bAnyFound = true;
if ((pSourceEx && pSourceEx->IsSignaledNoSpinIfUserland()) ||
(!pSourceEx && pSource->IsSignaled()))
{
signalTemp.push_back(pSource);
itr = lsList2.erase(itr);
}
else
{
if (bLastTick && bZeroTick)
{
itr = lsList2.erase(itr);
}
else
{
itr++;
}
}
}
else
{
itr++;
}
}
};
if (gRuntimeConfig.threadingConfig.bPlatformIsSMPProcessorOptimized &&
!bZeroTick)
{
AuThreadPrimitives::DoTryIf([&]()
{
DoTheThing(false);
return lsList2.size() != lsList.size() ||
!bAnyFound;
});
if (bAnyFound)
{
DoTheThing(true);
}
}
else
{
DoTheThing(true);
}
if (lsList2.size())
{
bZeroTick |= bool(signalTemp.size());
AuUInt32 uTimeoutMS {};
if (uTimeoutEnd)
{
auto uStartTime = Time::SteadyClockNS();
if (uStartTime >= uTimeoutEnd)
{
bZeroTick = true;
}
uTimeoutMS = AuNSToMS<AuInt64>(uTimeoutEnd - uStartTime);
if (!uTimeoutMS)
{
bZeroTick = true;
}
}
#if defined(AURORA_IS_MODERNNT_DERIVED)
signaled = WaitMultipleOrObjects(lsList2, bZeroTick, uTimeoutMS);
bTimedout = uTimeoutEnd && uTimeoutMS && !bZeroTick ?
Time::SteadyClockNS() >= uTimeoutEnd :
false;
#else
signaled = WaitMultipleOrObjectsFallback(lsList2, uTimeoutMS, bZeroTick, bTimedout);
bTimedout &= !bZeroTick;
#endif
}
signaled.insert(signaled.end(), signalTemp.begin(), signalTemp.end());
if (bTimedout)
{
return false;
}
else
{
return signaled.size();
}
}
}
AuList<AuSPtr<ILoopSource>> WaitMultipleOrObjectsFallback(const AuList<AuSPtr<ILoopSource>> &objects, AuUInt32 timeout, bool bZeroTick, bool &bTimeout)
{
AuList<AuSPtr<ILoopSourceEx>> loopSourceExs;
AuList<AuSPtr<ILoopSource>> triggered;
if (objects.empty())
{
return {};
}
bTimeout = false;
auto pQueue = AuLoop::NewLoopQueue();
if (!pQueue)
{
return {};
}
try
{
loopSourceExs.reserve(objects.size());
triggered.reserve(triggered.size());
for (const auto &source : objects)
{
if (!source)
{
continue;
}
if (!pQueue->SourceAdd(source))
{
return {};
}
if (source->GetType() == ELoopSource::eSourceWin32)
{
continue;
}
if (auto pLoopSourceEx = AuDynamicCast<ILoopSourceEx>(source))
{
if (!AuTryInsert(loopSourceExs, pLoopSourceEx))
{
return {};
}
}
}
}
catch (...)
{
return {};
}
auto pListener = AuMakeSharedThrow<AuLoop::ILoopSourceSubscriberFunctional>([&](const AuSPtr<ILoopSource> &source)
{
AU_DEBUG_MEMCRUNCH;
triggered.push_back(source);
return false;
});
if (!pQueue->AddCallback(pListener))
{
return {};
}
for (const auto &source : loopSourceExs)
{
source->OnPresleep();
}
if (bZeroTick)
{
(void)pQueue->PumpNonblocking();
}
else
{
bTimeout = !pQueue->WaitAny(timeout);
}
for (AU_ITERATE_N(i, loopSourceExs.size()))
{
auto pLoopSource = loopSourceExs[i];
if (std::find(triggered.begin(), triggered.end(), pLoopSource) == triggered.end())
{
auto eType = pLoopSource->GetType();
if (!bTimeout ||
eType == ELoopSource::eSourceMutex ||
eType == ELoopSource::eSourceSemaphore ||
eType == ELoopSource::eSourceEvent)
{
if (pLoopSource->IsSignaledNoSpinIfUserland())
{
triggered.push_back(pLoopSource);
}
}
}
pLoopSource->OnFinishSleep();
}
return triggered;
}
}