[+] Experimental AuThreading::TryWaitOnAddressUntilEqualEx, WaitOnAddressUntilEqual, WaitOnAddressUntilEqualSteady

This commit is contained in:
Reece Wilson 2024-03-02 23:23:16 +00:00
parent 27977779a9
commit 3004c3de19
3 changed files with 257 additions and 21 deletions

View File

@ -30,6 +30,10 @@
* bPreferEmulatedWakeOnAddress disables the emulation layer, if theres a reasonable native
* interface available.
* Defer to ThreadingConfig::bPreferEmulatedWakeOnAddress = !AuBuild::kIsNtDerived
Note: UntilEqual (new experimental) variants yield until a specified pCompareAddress value.
The base variants treat pCompareAddress as the previous CAS return value.
***/
#pragma once
@ -51,6 +55,10 @@ namespace Aurora::Threading
const void *pCompareAddress,
AuUInt8 uWordSize);
AUKN_SYM bool TryWaitOnAddressUntilEqual(const void *pTargetAddress,
const void *pCompareAddress,
AuUInt8 uWordSize);
// On systems with processors of shared execution pipelines, these try-series of operations will spin (eg: mm_pause) for a configurable
// amount of time, so long as the the process-wide state isn't overly contested. This means you can use these arbitrarily without
// worrying about an accidental thundering mm_pause herd. If you wish to call WaitOnAddress[...] afterwards, you should report you already
@ -62,12 +70,24 @@ namespace Aurora::Threading
AuUInt8 uWordSize,
const AuFunction<bool(const void *, const void *, AuUInt8)> &check);
AUKN_SYM bool TryWaitOnAddressUntilEqualEx(const void *pTargetAddress,
const void *pCompareAddress,
AuUInt8 uWordSize,
const AuFunction<bool(const void *, const void *, AuUInt8)> &check);
// Relative timeout variant of nanosecond resolution WoA. 0 = indefinite
AUKN_SYM bool WaitOnAddress(const void *pTargetAddress,
const void *pCompareAddress,
AuUInt8 uWordSize,
AuUInt64 qwNanoseconds,
AuOptional<bool> optAlreadySpun = {} /*hint: do not spin before switching. subject to global config.*/);
// Relative timeout variant of nanosecond resolution WoA. 0 = indefinite
AUKN_SYM bool WaitOnAddressUntilEqual(const void *pTargetAddress,
const void *pCompareAddress,
AuUInt8 uWordSize,
AuUInt64 qwNanoseconds,
AuOptional<bool> optAlreadySpun = {} /*hint: do not spin before switching. subject to global config.*/);
// Absolute timeout variant of nanosecond resolution WoA. Nanoseconds are in steady clock time. 0 = indefinite
AUKN_SYM bool WaitOnAddressSteady(const void *pTargetAddress,
@ -75,4 +95,12 @@ namespace Aurora::Threading
AuUInt8 uWordSize,
AuUInt64 qwNanoseconds,
AuOptional<bool> optAlreadySpun = {} /*hint: do not spin before switching. subject to global config.*/);
// Absolute timeout variant of nanosecond resolution WoA. Nanoseconds are in steady clock time. 0 = indefinite
AUKN_SYM bool WaitOnAddressUntilEqualSteady(const void *pTargetAddress,
const void *pCompareAddress,
AuUInt8 uWordSize,
AuUInt64 qwNanoseconds,
AuOptional<bool> optAlreadySpun = {} /*hint: do not spin before switching. subject to global config.*/);
}

View File

@ -107,8 +107,6 @@ namespace Aurora::Threading
while (uNow < uEndTime)
{
FlushWaitBufferPAddressCache();
if (!WaitBuffer::Compare(this->pAddress, this->uSize, state))
{
return true;
@ -123,7 +121,7 @@ namespace Aurora::Threading
#if !defined(WOA_SEMAPHORE_MODE)
this->mutex.Unlock();
#endif
(void)gProcessWaitables.WaitBufferFrom(this->pAddress, this->uSize, false);
(void)gProcessWaitables.WaitBufferFrom(this->pAddress, this->uSize, false, state.pCompare2);
#if !defined(WOA_SEMAPHORE_MODE)
this->mutex.Lock();
#endif
@ -152,7 +150,7 @@ namespace Aurora::Threading
#if !defined(WOA_SEMAPHORE_MODE)
this->mutex.Unlock();
#endif
(void)gProcessWaitables.WaitBufferFrom(this->pAddress, this->uSize, false);
(void)gProcessWaitables.WaitBufferFrom(this->pAddress, this->uSize, false, state.pCompare2);
#if !defined(WOA_SEMAPHORE_MODE)
this->mutex.Lock();
#endif
@ -165,8 +163,6 @@ namespace Aurora::Threading
this->variable.WaitForSignalNsEx(&this->mutex, 0, false);
#endif
}
FlushWaitBufferPAddressCache();
}
return true;
@ -182,6 +178,14 @@ namespace Aurora::Threading
return false;
}
if (this->pCompareAddress)
{
if (!WaitBuffer::Compare(pAddress, this->uSize, this->pCompareAddress))
{
return false;
}
}
#if defined(WOA_SEMAPHORE_MODE)
this->semaphore->Unlock(1);
#else
@ -200,9 +204,12 @@ namespace Aurora::Threading
bool WaitBuffer::Compare(const void *pBuf, AuUInt8 uSize, WaitState &state)
{
bool bRet {};
FlushWaitBufferPAddressCache();
if (!state.uDownsizeMask)
{
return AuMemcmp(pBuf, state.compare.buffer, AuMin(uSize, state.compare.uSize)) == 0;
bRet = AuMemcmp(pBuf, state.compare.buffer, AuMin(uSize, state.compare.uSize)) == 0;
}
else
{
@ -211,12 +218,17 @@ namespace Aurora::Threading
auto &uSrcWord = *AuReinterpretCast<const AuUInt32 *>(pBuf);
auto &uCmpWord = *AuReinterpretCast<const AuUInt32 *>(state.compare.buffer);
return (uSrcWord & uMask) == (uCmpWord & uMask);
bRet = (uSrcWord & uMask) == (uCmpWord & uMask);
}
bRet ^= bool(state.pCompare2);
return bRet;
}
bool WaitBuffer::Compare(const void *pBuf, AuUInt8 uSize, const void *pBuf2)
{
FlushWaitBufferPAddressCache();
switch (uSize)
{
case 1:
@ -239,9 +251,11 @@ namespace Aurora::Threading
bool WaitBuffer::Compare(WaitState &state)
{
bool bRet {};
if (!state.uDownsizeMask)
{
return WaitBuffer::Compare(this->buffer, AuMin(this->uSize, state.compare.uSize), state.compare.buffer);
bRet = WaitBuffer::Compare(this->buffer, AuMin(this->uSize, state.compare.uSize), state.compare.buffer);
}
else
{
@ -250,11 +264,14 @@ namespace Aurora::Threading
auto &uSrcWord = *AuReinterpretCast<const AuUInt32 *>(this->buffer);
auto &uCmpWord = *AuReinterpretCast<const AuUInt32 *>(state.compare.buffer);
return (uSrcWord & uMask) == (uCmpWord & uMask);
bRet = (uSrcWord & uMask) == (uCmpWord & uMask);
}
bRet ^= bool(state.pCompare2);
return bRet;
}
WaitEntry *ProcessWaitNodeContainer::WaitBufferFrom(const void *pAddress, AuUInt8 uSize, bool bScheduleFirst)
WaitEntry *ProcessWaitNodeContainer::WaitBufferFrom(const void *pAddress, AuUInt8 uSize, bool bScheduleFirst, const void *pCompareAddress)
{
#if defined(HACK_NO_INVALID_ACCESS_LEAK_SHARED_REF_ON_DESTROYED_THREAD)
auto pReturn = tlsWaitEntry.get();
@ -262,8 +279,9 @@ namespace Aurora::Threading
auto pReturn = &tlsWaitEntry;
#endif
pReturn->pAddress = pAddress;
pReturn->uSize = uSize;
pReturn->pAddress = pAddress;
pReturn->uSize = uSize;
pReturn->pCompareAddress = pCompareAddress;
if (bScheduleFirst /*First in, First Out*/)
{
@ -416,9 +434,9 @@ namespace Aurora::Threading
#define AddressToIndex AuHashCode(pAddress) & (AuArraySize(this->list) - 1)
WaitEntry *ProcessWaitContainer::WaitBufferFrom(const void *pAddress, AuUInt8 uSize, bool bScheduleFirst)
WaitEntry *ProcessWaitContainer::WaitBufferFrom(const void *pAddress, AuUInt8 uSize, bool bScheduleFirst, const void *pCompareAddress)
{
return this->list[AddressToIndex].WaitBufferFrom(pAddress, uSize, bScheduleFirst);
return this->list[AddressToIndex].WaitBufferFrom(pAddress, uSize, bScheduleFirst, pCompareAddress);
}
template <typename T>
@ -483,13 +501,23 @@ namespace Aurora::Threading
AuUInt8 uWordSize,
AuOptional<AuUInt64> qwNanoseconds,
AuOptional<AuUInt64> qwNanosecondsAbs,
bool bOSSupportsWait
bool bOSSupportsWait,
const void *pCompareAddress2
)
{
WaitState state;
SysAssertDbg(uWordSize <= 32);
auto pWaitEntry = gProcessWaitables.WaitBufferFrom(pTargetAddress, uWordSize, true);
state.compare = WaitBuffer::From(pCompareAddress, uWordSize);
auto pWaitEntry = gProcessWaitables.WaitBufferFrom(pTargetAddress, uWordSize, true, pCompareAddress2);
// Unlocked update to a safer comparison address; hardens against bad code
{
state.compare = WaitBuffer::From(pCompareAddress, uWordSize);
// Replace from pCompareAddress2 to our own memory to harden against bad volatile comparison pointers
pWaitEntry->pCompareAddress = state.pCompare2 =
pCompareAddress2 ? state.compare.buffer : nullptr;
}
if (qwNanoseconds)
{
@ -846,6 +874,66 @@ namespace Aurora::Threading
return !WaitBuffer::Compare(pTargetAddress, state.uWordSize, state);
}
static void RunOSWakeNOnAddress(const void *pAddress,
AuUInt32 dwCount);
static void RunOSWaitOnAddressEQNoTimedNoErrors(const void *pTargetAddress,
const void *pCompareAddress,
WaitState &state)
{
while (true)
{
WaitBuffer wb = WaitBuffer::From(pTargetAddress, state.uWordSize);
if (!wb.Compare(state))
{
return;
}
(void)RunOSWaitOnAddressNoTimed(pTargetAddress, wb.buffer, state.uWordSize);
if (WaitBuffer::Compare(pTargetAddress, state.uWordSize, state))
{
RunOSWakeNOnAddress(pTargetAddress, 1);
}
else
{
return;
}
}
}
static bool RunOSWaitOnAddressEQTimedSteady(const void *pTargetAddress,
const void *pCompareAddress,
WaitState &state,
bool bSpun = false)
{
while (true)
{
WaitBuffer wb = WaitBuffer::From(pTargetAddress, state.uWordSize);
if (!wb.Compare(state))
{
return true;
}
bool bResult = RunOSWaitOnAddressTimed(pTargetAddress, wb.buffer, state.uWordSize, state.qwNanosecondsAbs.value(), { }, { }, bSpun);
if (WaitBuffer::Compare(pTargetAddress, state.uWordSize, state))
{
RunOSWakeNOnAddress(pTargetAddress, 1);
if (!bResult)
{
return false;
}
}
else
{
return true;
}
}
}
static void RunOSWakeNOnAddress(const void *pAddress,
AuUInt32 dwCount)
{
@ -976,6 +1064,25 @@ namespace Aurora::Threading
optAlreadySpun);
}
AUKN_SYM bool WaitOnAddressUntilEqual(const void *pTargetAddress,
const void *pCompareAddress,
AuUInt8 uWordSize,
AuUInt64 qwNanoseconds,
AuOptional<bool> optAlreadySpun)
{
// Avoid SteadyTime syscall in the event of HAL retardation (missing KUSER QPC, Linux vDSO, etc)
if (WaitBuffer::Compare(pTargetAddress, uWordSize, pCompareAddress))
{
return true;
}
return WaitOnAddressUntilEqualSteady(pTargetAddress,
pCompareAddress,
uWordSize,
qwNanoseconds ? qwNanoseconds + AuTime::SteadyClockNS() : 0,
optAlreadySpun);
}
AUKN_SYM bool TryWaitOnAddress(const void *pTargetAddress,
const void *pCompareAddress,
AuUInt8 uWordSize)
@ -986,6 +1093,16 @@ namespace Aurora::Threading
});
}
AUKN_SYM bool TryWaitOnAddressUntilEqual(const void *pTargetAddress,
const void *pCompareAddress,
AuUInt8 uWordSize)
{
return Primitives::DoTryIf([&]()
{
return WaitBuffer::Compare(pCompareAddress, uWordSize, pTargetAddress);
});
}
AUKN_SYM bool TryWaitOnAddressEx(const void *pTargetAddress,
const void *pCompareAddress,
AuUInt8 uWordSize,
@ -1007,6 +1124,27 @@ namespace Aurora::Threading
});
}
AUKN_SYM bool TryWaitOnAddressUntilEqualEx(const void *pTargetAddress,
const void *pCompareAddress,
AuUInt8 uWordSize,
const AuFunction<bool(const void *, const void *, AuUInt8)> &check)
{
if (!check)
{
return TryWaitOnAddressUntilEqual(pTargetAddress, pCompareAddress, uWordSize);
}
return Primitives::DoTryIf([&]()
{
if (!WaitBuffer::Compare(pCompareAddress, uWordSize, pTargetAddress))
{
return false;
}
return check(pTargetAddress, pCompareAddress, uWordSize);
});
}
AUKN_SYM void WakeNOnAddress(const void *pTargetAddress,
AuUInt8 uNMaximumThreads)
{
@ -1119,7 +1257,75 @@ namespace Aurora::Threading
}
}
return WaitOnAddressWide(pTargetAddress, pCompareAddress, uWordSize, {}, qwNanoseconds ? qwNanoseconds : AuOptional<AuUInt64>{}, false);
return WaitOnAddressWide(pTargetAddress, pCompareAddress, uWordSize, {}, qwNanoseconds ? qwNanoseconds : AuOptional<AuUInt64>{}, false, nullptr);
}
return false;
}
AUKN_SYM bool WaitOnAddressUntilEqualSteady(const void *pTargetAddress,
const void *pCompareAddress,
AuUInt8 uWordSize,
AuUInt64 qwNanoseconds,
AuOptional<bool> optAlreadySpun)
{
// Avoid emulated path dynamic TLS fetch without TLS section
// or various security checks
// or other such bloated thunks
if (WaitBuffer::Compare(pCompareAddress, uWordSize, pTargetAddress))
{
return true;
}
bool bWaitOnAddress = IsWaitOnRecommended();
if (bWaitOnAddress)
{
auto [pWaitAddress, uDelta, uMask] = DecodeAddress(pTargetAddress, uWordSize);
auto pCompareAddress2 = AuReinterpretCast<const char *>(pCompareAddress) - uDelta;
WaitState state;
state.uDownsizeMask = uMask;
state.compare = uMask ?
WaitBuffer::From(pCompareAddress2, 4) :
WaitBuffer::From(pCompareAddress2, uWordSize);
state.uWordSize = uMask ? 4 : uWordSize;
state.pCompare2 = pCompareAddress;
bool bSpun {};
if (Primitives::ThrdCfg::gPreferWaitOnAddressAlwaysSpinNative &&
optAlreadySpun.value_or(false))
{
if (TryWaitOnAddressUntilEqual(pTargetAddress, pCompareAddress, uWordSize))
{
return true;
}
bSpun = true;
}
if (!qwNanoseconds)
{
RunOSWaitOnAddressEQNoTimedNoErrors(pWaitAddress, pCompareAddress2, state);
return true;
}
else
{
state.qwNanosecondsAbs = qwNanoseconds;
return RunOSWaitOnAddressEQTimedSteady(pWaitAddress, pCompareAddress2, state, bSpun);
}
}
else
{
if (Primitives::ThrdCfg::gPreferWaitOnAddressAlwaysSpin &&
optAlreadySpun.value_or(false))
{
if (TryWaitOnAddressUntilEqual(pTargetAddress, pCompareAddress, uWordSize))
{
return true;
}
}
return WaitOnAddressWide(pTargetAddress, pCompareAddress, uWordSize, {}, qwNanoseconds ? qwNanoseconds : AuOptional<AuUInt64>{}, false, pCompareAddress);
}
return false;

View File

@ -39,6 +39,7 @@ namespace Aurora::Threading
AuOptionalEx<AuUInt64> qwNanosecondsAbs;
AuOptionalEx<AuUInt32> uDownsizeMask;
AuUInt32 uWordSize {};
const void *pCompare2 {};
};
struct WaitEntry
@ -78,6 +79,7 @@ namespace Aurora::Threading
// state
const void *pAddress {};
AuUInt8 uSize {};
const void *pCompareAddress {};
// bookkeeping (parent container)
volatile bool bAlive {}; // wait entry validity. must be rechecked for each spurious or expected wake, if the comparison doesn't break the yield loop.
@ -99,7 +101,7 @@ namespace Aurora::Threading
AuUInt32 uAtomic {};
ProcessListWait waitList;
WaitEntry *WaitBufferFrom(const void *pAddress, AuUInt8 uSize, bool bScheduleFirst);
WaitEntry *WaitBufferFrom(const void *pAddress, AuUInt8 uSize, bool bScheduleFirst, const void *pAddressCompare);
template <typename T>
bool IterateWake(T callback);
@ -116,7 +118,7 @@ namespace Aurora::Threading
{
ProcessWaitNodeContainer list[kDefaultWaitPerProcess];
WaitEntry *WaitBufferFrom(const void *pAddress, AuUInt8 uSize, bool bScheduleFirst = true);
WaitEntry *WaitBufferFrom(const void *pAddress, AuUInt8 uSize, bool bScheduleFirst, const void *pAddressCompare);
template <typename T>
bool IterateWake(const void *pAddress, T callback);