[*] AuLoop::WaitMultipleLoopSources optimizations (do not allocate so much, if we can), and

[*] Updated Loop.hpp comment
This commit is contained in:
Reece Wilson 2024-09-28 22:59:37 +01:00
parent ff6409859f
commit 6e9e962c84
3 changed files with 186 additions and 97 deletions

View File

@ -19,12 +19,11 @@
namespace Aurora::IO::Loop namespace Aurora::IO::Loop
{ {
// TODO: We cannot reach true parity with NTs WaitForMultipleObjectsEx in this current state ( TODO )
// TODO: Win32 enters an alertable state, does not break // [1] There is no flag for enter/do-not-enter alertable state (processes file transactions and their subscribers, network transactions and their subscribers, APCs, etc)
// Linux does not enter an alertable state, but will soon-ish // (also see: ./../IOSleep.hpp)
// There is no flag for enter alertable state // [2] There is no explicit flag for awoken by APC ( could use true && signaled.empty() ).
// Cannot reach true parity with WaitForMultipleObjectsEx in this current state // [3] And we can't do timer callbacks in-thread with the state of our current APC implementation ( TODO ).
// Also, there is no flag for awoken by APC
// optTimeoutMS = {} | indefinite // optTimeoutMS = {} | indefinite
// optTimeoutMS = 0 | poll // optTimeoutMS = 0 | poll

View File

@ -14,64 +14,81 @@ namespace Aurora::IO::Loop
{ {
AuList<AuSPtr<ILoopSource>> WaitMultipleOrObjects(const AuList<AuSPtr<ILoopSource>> &objects, bool bZeroTick, AuUInt32 dwTimeoutReq, bool bAllowOthers, bool &bTooMany) AuList<AuSPtr<ILoopSource>> WaitMultipleOrObjects(const AuList<AuSPtr<ILoopSource>> &objects, bool bZeroTick, AuUInt32 dwTimeoutReq, bool bAllowOthers, bool &bTooMany)
{ {
bool isWinLoop; bool bIsWinLoop;
AuList<AuSPtr<ILoopSourceEx>> loopSourceExs; AuSPtr<ILoopSourceEx> loopSourceExs[MAXIMUM_WAIT_OBJECTS];
HANDLE handleArray[MAXIMUM_WAIT_OBJECTS];
AuUInt32 uCounterHandle, uCounterFD;
AuSPtr<ILoopSource> pMsgSource;
AuList<AuSPtr<ILoopSource>> triggered; AuList<AuSPtr<ILoopSource>> triggered;
AuList<HANDLE> handleArray;
AuSPtr<ILoopSource> msgSource; bIsWinLoop = false;
uCounterHandle = 0;
uCounterFD = 0;
try try
{ {
isWinLoop = false;
loopSourceExs.reserve(objects.size());
handleArray.reserve(objects.size());
triggered.reserve(triggered.size()); triggered.reserve(triggered.size());
for (const auto &source : objects)
{
if (!source)
{
continue;
}
if (source->GetType() == ELoopSource::eSourceWin32)
{
isWinLoop = true;
msgSource = source;
continue;
}
if (auto extended = AuDynamicCast<ILoopSourceEx>(source))
{
loopSourceExs.push_back(extended);
if (extended->Singular())
{
handleArray.push_back(reinterpret_cast<HANDLE>(extended->GetHandle()));
}
else
{
for (const auto &handle : extended->GetHandles())
{
handleArray.push_back(reinterpret_cast<HANDLE>(handle));
}
}
}
}
} }
catch (...) catch (...)
{ {
return {}; return {};
} }
if (handleArray.size() >= MAXIMUM_WAIT_OBJECTS) for (const auto &source : objects)
{ {
bTooMany = true; if (!source)
return {}; {
continue;
}
if (source->GetType() == ELoopSource::eSourceWin32)
{
bIsWinLoop = true;
pMsgSource = source;
continue;
}
if (auto extended = AuDynamicCast<ILoopSourceEx>(source))
{
{
auto uNewIndex = uCounterHandle++;
if (uNewIndex == MAXIMUM_WAIT_OBJECTS)
{
bTooMany = true;
return {};
}
loopSourceExs[uNewIndex] = extended;
}
if (extended->Singular())
{
auto uNewIndex = uCounterFD++;
if (uNewIndex == MAXIMUM_WAIT_OBJECTS)
{
bTooMany = true;
return {};
}
handleArray[uNewIndex] = reinterpret_cast<HANDLE>(extended->GetHandle());
}
else
{
for (const auto &handle : extended->GetHandles())
{
auto uNewIndex = uCounterFD++;
if (uNewIndex == MAXIMUM_WAIT_OBJECTS)
{
bTooMany = true;
return {};
}
handleArray[uNewIndex] = reinterpret_cast<HANDLE>(extended->GetHandle());
}
}
}
} }
for (const auto &source : loopSourceExs) for (AU_ITERATE_N(i, uCounterHandle))
{ {
source->OnPresleep(); loopSourceExs[i]->OnPresleep();
} }
AuUInt32 uTimeout {}; AuUInt32 uTimeout {};
@ -83,21 +100,21 @@ namespace Aurora::IO::Loop
} }
DWORD ret; DWORD ret;
if (isWinLoop && if (bIsWinLoop &&
pMsgWaitForMultipleObjectsEx) pMsgWaitForMultipleObjectsEx)
{ {
ret = pMsgWaitForMultipleObjectsEx(handleArray.size(), handleArray.data(), uTimeout, QS_ALLPOSTMESSAGE | QS_ALLINPUT, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE); ret = pMsgWaitForMultipleObjectsEx(uCounterFD, handleArray, uTimeout, QS_ALLPOSTMESSAGE | QS_ALLINPUT, MWMO_INPUTAVAILABLE | MWMO_ALERTABLE);
} }
else else
{ {
ret = ::WaitForMultipleObjectsEx(handleArray.size(), handleArray.data(), false, uTimeout, true); ret = ::WaitForMultipleObjectsEx(uCounterFD, handleArray, false, uTimeout, true);
} }
bool error = ((ret == WAIT_TIMEOUT) || bool error = ((ret == WAIT_TIMEOUT) ||
(ret == WAIT_IO_COMPLETION) || (ret == WAIT_IO_COMPLETION) ||
(ret == WAIT_FAILED)); (ret == WAIT_FAILED));
bool isPump = WAIT_OBJECT_0 + handleArray.size() == ret; bool isPump = WAIT_OBJECT_0 + uCounterFD == ret;
AuUInt firstTriggered {}; AuUInt firstTriggered {};
if (!error) if (!error)
@ -111,7 +128,7 @@ namespace Aurora::IO::Loop
if (isPump) if (isPump)
{ {
// msg loop queue for trigger // msg loop queue for trigger
AuTryInsert(triggered, msgSource); AuTryInsert(triggered, pMsgSource);
if (!bAllowOthers) if (!bAllowOthers)
{ {
@ -119,16 +136,18 @@ namespace Aurora::IO::Loop
} }
} }
for (const auto &source : loopSourceExs) for (AU_ITERATE_N(i, uCounterHandle))
{ {
auto &pSource = loopSourceExs[i];
if (!error) if (!error)
{ {
AuUInt lastHandle {}; AuUInt lastHandle {};
bool wasTriggered {}; bool wasTriggered {};
if (source->Singular()) if (pSource->Singular())
{ {
auto handle = source->GetHandle(); auto handle = pSource->GetHandle();
if ((firstTriggered == handle) || if ((firstTriggered == handle) ||
(bAllowOthers && WaitForSingleObject(reinterpret_cast<HANDLE>(handle), 0) == WAIT_OBJECT_0)) (bAllowOthers && WaitForSingleObject(reinterpret_cast<HANDLE>(handle), 0) == WAIT_OBJECT_0))
{ {
@ -138,7 +157,7 @@ namespace Aurora::IO::Loop
} }
else else
{ {
for (const auto &handle : source->GetHandles()) for (const auto &handle : pSource->GetHandles())
{ {
if ((firstTriggered == handle) || if ((firstTriggered == handle) ||
(bAllowOthers && WaitForSingleObject(reinterpret_cast<HANDLE>(handle), 0) == WAIT_OBJECT_0)) (bAllowOthers && WaitForSingleObject(reinterpret_cast<HANDLE>(handle), 0) == WAIT_OBJECT_0))
@ -151,16 +170,16 @@ namespace Aurora::IO::Loop
} }
// notify the loopsource of ex, only signal once we've filtered the kernel signal // notify the loopsource of ex, only signal once we've filtered the kernel signal
if (wasTriggered && source->OnTrigger(lastHandle)) if (wasTriggered && pSource->OnTrigger(lastHandle))
{ {
// pog // pog
AuTryInsert(triggered, source); AuTryInsert(triggered, pSource);
} }
} }
source->OnFinishSleep(); pSource->OnFinishSleep();
} }
return triggered; return AuMove(triggered);
} }
} }

View File

@ -36,19 +36,85 @@ namespace Aurora::IO::Loop
bTooMany = true; bTooMany = true;
return false; return false;
#else #else
AuList<AuSPtr<ILoopSourceEx>> loopSourceExs;
AuList<AuSPtr<ILoopSourceEx>> loopSourceExs2;
AuList<AuSPtr<ILoopSource>> triggered; AuList<AuSPtr<ILoopSource>> triggered;
AuList<pollfd> handleArray;
AuList<AuPair<AuUInt, bool>> handleIndicies; // fallback
AuList<AuSPtr<ILoopSourceEx>> loopSourceExs1;
AuList<AuSPtr<ILoopSourceEx>> loopSourceExs21;
AuList<pollfd> handleArray1;
AuList<AuPair<AuUInt, bool>> handleIndicies1;
// 4k~ of stack
AuSPtr<ILoopSourceEx> loopSourceExs12[128];
AuSPtr<ILoopSourceEx> loopSourceExs22[128];
pollfd handleArray2[128];
AuPair<AuUInt, bool> handleIndicies2[128];
// phead
AuSPtr<ILoopSourceEx> * pLoopSourceExs1;
AuSPtr<ILoopSourceEx> * pLoopSourceExs2;
pollfd * pHandleArray;
AuPair<AuUInt, bool> * pHandleIndicies;
AuUInt32 uHandleOffset, uFDOffset, uHandleOffset2;
// length / size
uHandleOffset = 0;
uHandleOffset2 = 0;
uFDOffset = 0;
// default heads
pLoopSourceExs1 = loopSourceExs12;
pLoopSourceExs2 = loopSourceExs22;
pHandleArray = handleArray2;
pHandleIndicies = handleIndicies2;
#define HANDLE_PUSH_CHILD_Z(value, aaa, bbb, ccc, ggg, FF) \
{ \
if (FF > AuArraySize(aaa)) \
{ \
ccc.push_back(value); \
bbb = ccc.data(); \
} \
else if (FF == AuArraySize(aaa)) \
{ \
ccc.insert(ccc.begin(), &ggg[0], &ggg[AuArraySize(aaa)]); \
ccc.push_back(value); \
bbb = ccc.data(); \
} \
else \
{ \
bbb[FF] = value; \
} \
}
#define HANDLE_PUSH_MAIN2(value) \
HANDLE_PUSH_CHILD_Z(value, loopSourceExs22, pLoopSourceExs2, loopSourceExs21, pLoopSourceExs2, uCurHandle)
#define HANDLE_PUSH_MAIN(value, handle) \
HANDLE_PUSH_CHILD_Z(value, loopSourceExs22, pLoopSourceExs1, loopSourceExs1, pLoopSourceExs1, handle)
#define HANDLE_PUSH_CHILD(value) \
{ \
auto uCurFD = uFDOffset++; \
HANDLE_PUSH_CHILD_Z(value, loopSourceExs22, pHandleArray, handleArray1, handleArray2, uCurFD) \
}
#define HANDLE_PUSH_CHILD2(value) \
{ \
HANDLE_PUSH_CHILD_Z(value, loopSourceExs22, pHandleIndicies, handleIndicies1, handleIndicies2, (uFDOffset - 1)) \
}
try try
{ {
loopSourceExs.reserve(objects.size()); if (AuArraySize(loopSourceExs22) < objects.size())
loopSourceExs2.reserve(objects.size()); {
handleArray.reserve(objects.size()); loopSourceExs1.reserve(objects.size());
loopSourceExs21.reserve(objects.size());
handleArray1.reserve(objects.size());
handleIndicies1.reserve(triggered.size());
}
triggered.reserve(triggered.size()); triggered.reserve(triggered.size());
handleIndicies.reserve(triggered.size());
for (const auto &source : objects) for (const auto &source : objects)
{ {
@ -59,15 +125,17 @@ namespace Aurora::IO::Loop
if (auto extended = AuDynamicCast<ILoopSourceEx>(source)) if (auto extended = AuDynamicCast<ILoopSourceEx>(source))
{ {
loopSourceExs2.push_back(extended); auto uCurHandle = uHandleOffset++;
HANDLE_PUSH_MAIN2(extended)
if (extended->Singular()) if (extended->Singular())
{ {
auto handle = extended->GetHandle(); auto handle = extended->GetHandle();
auto handleWrite = extended->GetWriteHandle(); auto handleWrite = extended->GetWriteHandle();
auto i = loopSourceExs.size(); auto i = uHandleOffset2++;
loopSourceExs.push_back(extended); HANDLE_PUSH_MAIN(extended, i);
if (handle != -1) if (handle != -1)
{ {
@ -76,8 +144,8 @@ namespace Aurora::IO::Loop
poll.events = POLLIN; poll.events = POLLIN;
poll.revents = 0; poll.revents = 0;
handleArray.push_back(poll); HANDLE_PUSH_CHILD(poll);
handleIndicies.push_back(AuMakePair(i, false)); HANDLE_PUSH_CHILD2(AuMakePair(i, false));
} }
if (handleWrite != -1) if (handleWrite != -1)
@ -87,8 +155,8 @@ namespace Aurora::IO::Loop
poll.events = POLLOUT; poll.events = POLLOUT;
poll.revents = 0; poll.revents = 0;
handleArray.push_back(poll); HANDLE_PUSH_CHILD(poll);
handleIndicies.push_back(AuMakePair(i, true)); HANDLE_PUSH_CHILD2(AuMakePair(i, false));
} }
} }
else else
@ -96,8 +164,8 @@ namespace Aurora::IO::Loop
auto handles = extended->GetHandles(); auto handles = extended->GetHandles();
auto handlesWrite = extended->GetWriteHandles(); auto handlesWrite = extended->GetWriteHandles();
auto i = loopSourceExs.size(); auto i = uHandleOffset2++;
loopSourceExs.push_back(extended); HANDLE_PUSH_MAIN(extended, i);
for (const auto &handle : handles) for (const auto &handle : handles)
{ {
@ -111,8 +179,8 @@ namespace Aurora::IO::Loop
poll.events = POLLIN; poll.events = POLLIN;
poll.revents = 0; poll.revents = 0;
handleArray.push_back(poll); HANDLE_PUSH_CHILD(poll);
handleIndicies.push_back(AuMakePair(i, false)); HANDLE_PUSH_CHILD2(AuMakePair(i, false));
} }
for (const auto &handle : handlesWrite) for (const auto &handle : handlesWrite)
@ -127,8 +195,8 @@ namespace Aurora::IO::Loop
poll.events = POLLOUT; poll.events = POLLOUT;
poll.revents = 0; poll.revents = 0;
handleArray.push_back(poll); HANDLE_PUSH_CHILD(poll);
handleIndicies.push_back(AuMakePair(i, true)); HANDLE_PUSH_CHILD2(AuMakePair(i, false));
} }
} }
} }
@ -139,9 +207,10 @@ namespace Aurora::IO::Loop
return {}; return {};
} }
for (const auto &source : loopSourceExs2) // must be 1:1 - do not mess with
for (AU_ITERATE_N(i, uHandleOffset))
{ {
source->OnPresleep(); pLoopSourceExs2[i]->OnPresleep();
} }
AuUInt32 uTimeout {}; AuUInt32 uTimeout {};
@ -161,29 +230,30 @@ namespace Aurora::IO::Loop
int ret; int ret;
do do
{ {
ret = poll(handleArray.data(), handleArray.size(), uTimeout); ret = poll(pHandleArray, uFDOffset, uTimeout);
uTimeout = 0; uTimeout = 0;
if (ret > 0) if (ret > 0)
{ {
for (AU_ITERATE_N(i, handleArray.size())) for (AU_ITERATE_N(i, uFDOffset))
{ {
if (!handleArray[i].revents) if (!pHandleArray[i].revents)
{ {
continue; continue;
} }
auto uIndex = AuGet<0>(handleIndicies[i]); auto uIndex = AuGet<0>(pHandleIndicies[i]);
auto bRead = AuGet<1>(handleIndicies[i]); auto bRead = AuGet<1>(pHandleIndicies[i]);
auto pLoopSource = AuExchange(loopSourceExs[uIndex], nullptr); auto pLoopSource = AuExchange(pLoopSourceExs1[uIndex], nullptr);
if (!pLoopSource) if (!pLoopSource)
{ {
// TODO: notify other? special types go here?
continue; continue;
} }
if (!pLoopSource->OnTrigger(handleArray[i].fd)) if (!pLoopSource->OnTrigger(pHandleArray[i].fd))
{ {
loopSourceExs[uIndex] = pLoopSource; pLoopSourceExs1[uIndex] = pLoopSource;
continue; continue;
} }
@ -199,9 +269,10 @@ namespace Aurora::IO::Loop
while (ret == -1 && while (ret == -1 &&
errno == EINTR); errno == EINTR);
for (const auto &source : loopSourceExs2) // must be 1:1 - do not mess with
for (AU_ITERATE_N(i, uHandleOffset))
{ {
source->OnFinishSleep(); pLoopSourceExs2[i]->OnFinishSleep();
} }
// TODO: ugly workaround (see: LSFromHdNonblocking rationale) for an ugly TODO issue implicating all targets (see public Loop.hpp) // TODO: ugly workaround (see: LSFromHdNonblocking rationale) for an ugly TODO issue implicating all targets (see public Loop.hpp)
@ -214,7 +285,7 @@ namespace Aurora::IO::Loop
#endif #endif
} }
return triggered; return AuMove(triggered);
#endif #endif
} }
} }