[+] AuThreading::WaitForMultipleAddressesOrWithIO (Mixed userspace monitors with IO primitives)

[+] AuThreading::WaitMultipleIOTrigger
[+] AuThreading::WaitMulipleContainer
This commit is contained in:
Reece Wilson 2024-12-12 17:29:36 +00:00
parent da928a44c8
commit 1013f985aa
3 changed files with 250 additions and 10 deletions

View File

@ -95,6 +95,11 @@
***/ ***/
#pragma once #pragma once
namespace Aurora::IO::Loop
{
struct ILoopSource;
}
namespace Aurora::Threading namespace Aurora::Threading
{ {
// Specifies to break a thread context yield when volatile pTargetAddress [... EWaitMethod operation ...] constant pCompareAddress // Specifies to break a thread context yield when volatile pTargetAddress [... EWaitMethod operation ...] constant pCompareAddress
@ -102,43 +107,68 @@ namespace Aurora::Threading
eNotEqual, eEqual, eLessThanCompare, eGreaterThanCompare, eLessThanOrEqualsCompare, eGreaterThanOrEqualsCompare, eAnd, eNotAnd eNotEqual, eEqual, eLessThanCompare, eGreaterThanCompare, eLessThanOrEqualsCompare, eGreaterThanOrEqualsCompare, eAnd, eNotAnd
)) ))
// User-provided wait head for allocation-less wait on multiple addresses
struct AU_ALIGN(sizeof(void *)) WaitMultipleEntry struct AU_ALIGN(sizeof(void *)) WaitMultipleEntry
{ {
AuUInt8 internalContext[128]; AuUInt8 internalContext[128];
// See WaitOnAddressSpecialSteady // See WaitOnAddressSpecialSteady
EWaitMethod eMethod { EWaitMethod::eNotEqual }; EWaitMethod eMethod { EWaitMethod::eNotEqual };
// See WaitOnAddressSteady // See WaitOnAddressSteady
union union
{ {
const void * pTargetAddress; const void * pTargetAddress;
const volatile void *pTargetVolatileAddress; const volatile void * pTargetVolatileAddress;
}; };
// See WaitOnAddressSteady // See WaitOnAddressSteady
const void *pCompareAddress; const void * pCompareAddress;
// See WaitOnAddressSteady // See WaitOnAddressSteady
AuUInt8 uSize {}; AuUInt8 uSize {};
// For each valid state change, this counter gets incremented by 1, allowing for list reuse. // For each valid state change, this counter gets incremented by 1, allowing for list reuse.
AuUInt16 uHasStateChangedCounter {}; AuUInt16 uHasStateChangedCounter {};
// Skip this current entry // Skip this current entry
bool bIgnoreCurrentFlag {}; bool bIgnoreCurrentFlag {};
}; };
// User-provided container of WaitMultipleEntry for allocation-less wait on multiple addresses
struct WaitMulipleContainer struct WaitMulipleContainer
{ {
// Assign this to a virtually contiguous array of WaitMultipleEntry entries. // Assign this to a virtually contiguous array of WaitMultipleEntry entries.
// A AuList, std::vector, WaitMultipleHead extent[N], or { raw pointer, count } will suffice. // A AuList, std::vector, WaitMultipleHead extent[N], or { raw pointer, count } will suffice.
AuMemoryViewWrite waitArray; AuMemoryViewWrite waitArray;
// 0 = indefinite, AuTime::SteadyClockXXX convention // 0 = indefinite, AuTime::SteadyClockXXX convention
AuUInt64 qwNanoseconds {}; AuUInt64 qwNanoseconds {};
}; };
// User-provided node for additional AuLoop-based IO objects to use in an alternative non-futex/non-keyedevent/non-waitonaddress path.
struct WaitMultipleIOTrigger
{
AuSPtr<IO::Loop::ILoopSource> pLoopSource;
bool bIgnoreCurrentFlag {};
AuUInt16 uHasStateChangedCounter {};
};
// User-provided container of WaitMultipleIOTrigger entries.
struct WaitMulipleContainerWithIO :
WaitMulipleContainer
{
// array of WaitMultipleIOTrigger
AuMemoryViewWrite lsArray;
// Break after one trigger?
bool bWaitOnlyOne {};
// Optional output parameter: index to the first triggered IO object
AuUInt32 * pTriggeredIndex {};
};
AUKN_SYM void WakeAllOnAddress(const void *pTargetAddress); AUKN_SYM void WakeAllOnAddress(const void *pTargetAddress);
AUKN_SYM void WakeOnAddress(const void *pTargetAddress); AUKN_SYM void WakeOnAddress(const void *pTargetAddress);
@ -225,6 +255,9 @@ namespace Aurora::Threading
AUKN_SYM bool WaitForMultipleAddressesAnd(const WaitMulipleContainer &waitMultipleOnAddress); AUKN_SYM bool WaitForMultipleAddressesAnd(const WaitMulipleContainer &waitMultipleOnAddress);
// Mixed IO (FD/Handle/similar) with user-space waitlists. Functionality is a superset of WaitForMultipleAddressesOr
AUKN_SYM bool WaitForMultipleAddressesOrWithIO(const WaitMulipleContainerWithIO &waitMultipleOnAddress);
// C++ doesn't allow for implicit casting between nonvolatile and volatile pointers. // C++ doesn't allow for implicit casting between nonvolatile and volatile pointers.
// The following stubs unify the above APIs for non-volatile marked atomic containers. // The following stubs unify the above APIs for non-volatile marked atomic containers.
// Whether the underlying data of "pTargetAddress" is thread-locally-volatile or not is upto the chosen compiler intrin used to load/store and/or whether you upcast to volatile later on. // Whether the underlying data of "pTargetAddress" is thread-locally-volatile or not is upto the chosen compiler intrin used to load/store and/or whether you upcast to volatile later on.

View File

@ -417,6 +417,12 @@ namespace Aurora::Threading
return true; return true;
} }
} }
if (this->bSemaphoreActive &&
this->pSemaphore)
{
this->pSemaphore->AddOne();
}
} }
else if (this->pCompareAddress) else if (this->pCompareAddress)
{ {
@ -2135,4 +2141,202 @@ namespace Aurora::Threading
return bStatus; return bStatus;
} }
AUKN_SYM bool WaitForMultipleAddressesOrWithIO(const WaitMulipleContainerWithIO &waitMultipleOnAddress)
{
AuLoop::WaitForMultipleLoopSourcesParameters parameters;
AuList<AuSPtr<AuLoop::ILoopSource>> loopSourceVec;
AuList<AuLoop::WaitForMultipleLoopSourcesInOutOpt> outVec;
bool bResult {}, bAny {}, bSleepStatus {};
AuUInt32 uTicks {};
WaitEntry *pWaitEntryMain {}, *pWaitEntryAux {};
SysAssertDbg(!IsWaitOnRecommended(), "WoA not in emulation mode");
auto uCountOfIO = waitMultipleOnAddress.lsArray.Count<WaitMultipleIOTrigger>();
auto pBaseOfIO = waitMultipleOnAddress.lsArray.Begin<WaitMultipleIOTrigger>();
if (waitMultipleOnAddress.pTriggeredIndex)
{
*waitMultipleOnAddress.pTriggeredIndex = AuUInt32(-1);
}
if (!AuTryResize(loopSourceVec, uCountOfIO + 1))
{
SysPushErrorMemory();
return false;
}
if (!AuTryResize(outVec, uCountOfIO + 1))
{
SysPushErrorMemory();
return false;
}
for (AU_ITERATE_N(i, uCountOfIO))
{
loopSourceVec[i + 1] = pBaseOfIO[i].pLoopSource;
outVec [i + 1].bSkip = pBaseOfIO[i].bIgnoreCurrentFlag;
}
auto uCount = waitMultipleOnAddress.waitArray.Count<WaitMultipleEntry>();
auto pBase = waitMultipleOnAddress.waitArray.Begin<WaitMultipleEntry>();
#if defined(HACK_NO_INVALID_ACCESS_LEAK_SHARED_REF_ON_DESTROYED_THREAD)
auto pTempHoldMe = tlsWaitEntry;
auto pHead = pTempHoldMe.get();
#else
auto pHead = &tlsWaitEntry;
#endif
if (!pHead->pSemaphore)
{
pHead->pSemaphore = AuLoop::NewLSSemaphoreSlow(0u);
}
pHead->bSemaphoreActive = true;
loopSourceVec[0] = pHead->pSemaphore;
outVec [0].bSkip = false;
parameters.bIsVectorShared = true;
parameters.uFlags = AuLoop::kWaitMultipleFlagAny;
if (waitMultipleOnAddress.bWaitOnlyOne)
{
parameters.uFlags |= AuLoop::kWaitMultipleFlagBreakAfterOne;
}
parameters.vecArray = loopSourceVec;
parameters.vecArray2 = outVec;
do
{
for (AU_ITERATE_N(i, uCount))
{
auto &current = pBase[i];
auto pCurrent = AuReinterpretCast<MultipleInternalContext>(pBase[i].internalContext);
auto &state = pCurrent->state;
if (current.bIgnoreCurrentFlag)
{
continue;
}
pCurrent->pBefore = nullptr;
pCurrent->pNext = nullptr;
pCurrent->uMinTrigger = 0;
pCurrent->uCounter = 0;
pWaitEntryAux = gProcessWaitables.WaitBufferFrom2(current.pTargetAddress, current.uSize, current.pCompareAddress, current.eMethod, pCurrent, &waitMultipleOnAddress);
if (!pWaitEntryAux)
{
break;
}
else
{
pWaitEntryMain = pWaitEntryAux;
}
state.qwNanosecondsAbs = waitMultipleOnAddress.qwNanoseconds;
bAny = true;
}
if (!bAny && !uCountOfIO)
{
return true;
}
if (!pWaitEntryAux)
{
bSleepStatus = false;
}
else if (!AuAtomicLoad(&pWaitEntryAux->bAlive))
{
bSleepStatus = false;
}
else
{
auto uNow = AuTime::SteadyClockNS();
bool bTimeOutEarly {};
if (!waitMultipleOnAddress.qwNanoseconds)
{
parameters.optTimeoutMS = {};
}
else if (uNow >= waitMultipleOnAddress.qwNanoseconds)
{
if (uTicks)
{
bTimeOutEarly = true;
}
else
{
parameters.optTimeoutMS = 0;
}
}
else
{
parameters.optTimeoutMS = AuNSToMS<AuUInt32>(waitMultipleOnAddress.qwNanoseconds - uNow);
}
if (bTimeOutEarly)
{
bSleepStatus = false;
}
else
{
bSleepStatus = AuLoop::WaitMultipleLoopSources2(parameters);
}
}
for (AU_ITERATE_N(i, uCount))
{
auto &current = pBase[i];
auto pCurrent = AuReinterpretCast<MultipleInternalContext>(pBase[i].internalContext);
auto &state = pCurrent->state;
if (current.bIgnoreCurrentFlag)
{
continue;
}
if (!WaitBuffer::Compare(current.pTargetAddress, current.uSize, current.pCompareAddress, kMax64, current.eMethod))
{
current.uHasStateChangedCounter++;
bResult = true;
}
gProcessWaitables.RemoveSelf(current.pTargetAddress, pWaitEntryMain);
}
for (AU_ITERATE_N(i, uCountOfIO))
{
auto &refTriggered = outVec[i + 1].uTriggered;
if (refTriggered)
{
pBaseOfIO[i].uHasStateChangedCounter += refTriggered;
refTriggered = 0;
bResult = true;
if (waitMultipleOnAddress.pTriggeredIndex)
{
*waitMultipleOnAddress.pTriggeredIndex = i;
}
}
}
uTicks++;
}
while (!bResult && (!waitMultipleOnAddress.qwNanoseconds || bSleepStatus));
pHead->bSemaphoreActive = false;
#if defined(HACK_NO_INVALID_ACCESS_LEAK_SHARED_REF_ON_DESTROYED_THREAD)
pTempHoldMe.reset();
#endif
return bResult;
}
} }

View File

@ -132,6 +132,9 @@ namespace Aurora::Threading
auline WaitEntry *GetBefore(const void *pAddress); auline WaitEntry *GetBefore(const void *pAddress);
auline WaitEntry *GetSimilarFirstItr(const void *pAddress); auline WaitEntry *GetSimilarFirstItr(const void *pAddress);
auline void SetBefore(const void *pAddress, WaitEntry *pNext); auline void SetBefore(const void *pAddress, WaitEntry *pNext);
AuSPtr<AuLoop::ILSSemaphore> pSemaphore;
volatile bool bSemaphoreActive {};
}; };
struct ProcessListWait struct ProcessListWait