AuroraRuntime/Source/Threading/AuWakeOnAddress.cpp

767 lines
21 KiB
C++

/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuWakeOnAddress.cpp
Date: 2023-3-10
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuWakeOnAddress.hpp"
#include "Primitives/SMTYield.hpp"
namespace Aurora::Threading
{
#if defined(AURORA_IS_LINUX_DERIVED)
static int futex_wait(uint32_t *addr, uint32_t expected, const struct timespec *timeout)
{
if (timeout)
{
return futex(addr, FUTEX_WAIT_BITSET, expected, timeout, 0, FUTEX_BITSET_MATCH_ANY);
}
else
{
return futex(addr, FUTEX_WAIT, expected, timeout, 0, 0);
}
}
static int futex_wake(uint32_t *addr, uint32_t nthreads)
{
return futex(addr, FUTEX_WAKE, nthreads, 0, 0, 0);
}
#endif
static ProcessWaitContainer gProcessWaitables;
bool WaitEntry::TryAcquire(const void *pAddress, AuUInt8 uSize)
{
while (AuAtomicTestAndSet(&this->uAtomic, 0))
{
while (this->uAtomic)
{
AuThreading::ContextYield();
}
}
//AU_LOCK_GUARD(this->mutex);
if (this->pAddress)
{
this->uAtomic = 0;
return false;
}
this->pAddress = pAddress;
this->uSize = uSize;
this->uAtomic = 0;
return true;
}
void WaitEntry::Release()
{
if (this->bOverflow)
{
gProcessWaitables.Remove(this);
this->bOverflow = false;
}
AuResetMember(this->uSize);
AuResetMember(this->pAddress);
}
WaitEntry::WaitEntry() :
variable(AuUnsafeRaiiToShared(&this->mutex))
{
}
WaitEntry::~WaitEntry()
{
this->Release();
}
bool WaitEntry::SleepOn(WaitState &state)
{
AU_LOCK_GUARD(this->mutex);
if (state.qwNanoseconds)
{
if (!WaitBuffer::From(this->pAddress, this->uSize).Compare(state))
{
return true;
}
auto uNow = AuTime::SteadyClockNS();
auto uEndTime = uNow + state.qwNanoseconds.value();
#if defined(AURORA_IS_POSIX_DERIVED)
struct timespec tspec;
Time::auabsns2ts(&tspec, uEndTime);
#endif
while (uNow < uEndTime)
{
if (!WaitBuffer::From(this->pAddress, this->uSize).Compare(state))
{
return true;
}
auto uTimeRemNS = uEndTime - uNow;
#if defined(AURORA_IS_POSIX_DERIVED)
auto pCond = reinterpret_cast<pthread_cond_t *>(&this->variable.pthreadCv_);
auto mutex = reinterpret_cast<pthread_mutex_t *>(this->mutex->GetOSHandle());
int ret {};
do
{
ret = ::pthread_cond_timedwait(pCond, mutex, &tspec);
if (ret == 0)
{
break;
}
if (ret == ETIMEDOUT)
{
return !WaitBuffer::From(this->pAddress, this->uSize).Compare(state);
}
}
while (ret == EINTR);
#else
this->variable.WaitForSignalNS(uTimeRemNS);
#endif
uNow = AuTime::SteadyClockNS();
}
return !WaitBuffer::From(this->pAddress, this->uSize).Compare(state);
}
else
{
while (WaitBuffer::From(this->pAddress, this->uSize).Compare(state))
{
this->variable.WaitForSignal(0);
}
return true;
}
return false;
}
bool WaitEntry::TryWake(const void *pAddress)
{
while (AuAtomicTestAndSet(&this->uAtomic, 0))
{
while (this->uAtomic)
{
AuThreading::ContextYield();
}
}
auto bRet = TryWakeNoLock(pAddress);
if (!bRet)
{
this->uAtomic = 0;
}
return bRet;
}
bool WaitEntry::TryWakeNoLockNoReallyNoLock(const void *pAddress)
{
if (AuReinterpretCast<const char *>(this->pAddress) > AuReinterpretCast<const char *>(pAddress) ||
AuReinterpretCast<const char *>(this->pAddress) + this->uSize <= AuReinterpretCast<const char *>(pAddress))
{
return false;
}
this->uAtomic = 0;
this->variable.Signal();
return true;
}
bool WaitEntry::TryWakeNoLock(const void *pAddress)
{
if (AuReinterpretCast<const char *>(this->pAddress) > AuReinterpretCast<const char *>(pAddress) ||
AuReinterpretCast<const char *>(this->pAddress) + this->uSize <= AuReinterpretCast<const char *>(pAddress))
{
return false;
}
AU_LOCK_GUARD(this->mutex);
this->uAtomic = 0;
this->variable.Signal();
return true;
}
WaitBuffer WaitBuffer::From(const void *pBuf, AuUInt8 uSize)
{
WaitBuffer wait;
AuMemcpy(wait.buffer, pBuf, uSize);
wait.uSize = uSize;
return AuMove(wait);
}
bool WaitBuffer::Compare(const void *pBuf)
{
return AuMemcmp(this->buffer, pBuf, this->uSize) == 0;
}
bool WaitBuffer::Compare(WaitState &state)
{
if (!state.uDownsizeMask)
{
return AuMemcmp(this->buffer, state.compare.buffer, AuMin(this->uSize, state.compare.uSize)) == 0;
}
else
{
auto uMask = state.uDownsizeMask.value();
auto &uSrcWord = *AuReinterpretCast<AuUInt32 *>(this->buffer);
auto &uCmpWord = *AuReinterpretCast<AuUInt32 *>(state.compare.buffer);
return (uSrcWord & uMask) == (uCmpWord & uMask);
}
}
AuSPtr<WaitEntry> ProcessWaitContainer::WaitBufferFrom(void *pAddress, AuUInt8 uSize)
{
for (AU_ITERATE_N(i, kDefaultWaitPerProcess))
{
if (this->entries[i].TryAcquire(pAddress, uSize))
{
return AuUnsafeRaiiToShared(&this->entries[i]);
}
}
AuSPtr<WaitEntry> pNew;
{
AuDebug::AddMemoryCrunch();
pNew = AuMakeSharedPanic<WaitEntry>();
AuDebug::DecMemoryCrunch();
}
{
Lock();
this->overflow.push_back(pNew);
Unlock();
}
return pNew;
}
template <typename T>
bool ProcessWaitContainer::IterateAll(T callback)
{
for (AU_ITERATE_N(i, kDefaultWaitPerProcess))
{
auto &entry = this->entries[i];
{
while (AuAtomicTestAndSet(&entry.uAtomic, 0))
{
while (entry.uAtomic)
{
AuThreading::ContextYield();
}
}
if (!entry.pAddress)
{
entry.uAtomic = 0;
}
else
{
AU_LOCK_GUARD(entry.mutex);
entry.uAtomic = 0;
if (!callback(entry))
{
return false;
}
}
}
}
Lock();
for (auto &overflow : this->overflow)
{
AU_LOCK_GUARD(overflow->mutex);
if (!callback(*overflow.get()))
{
return false;
}
}
Unlock();
return true;
}
template <typename T>
bool ProcessWaitContainer::IterateForceNoCreateDuringOp(T callback)
{
bool bRetStatus { true };
for (AU_ITERATE_N(i, kDefaultWaitPerProcess))
{
auto &entry = this->entries[i];
{
while (AuAtomicTestAndSet(&entry.uAtomic, 0))
{
while (entry.uAtomic)
{
AuThreading::ContextYield();
}
}
if (entry.pAddress)
{
AU_LOCK_GUARD(entry.mutex);
if (!callback(entry))
{
for (AU_ITERATE_N(z, i + 1))
{
this->entries[z].uAtomic = 0;
}
return false;
}
}
}
}
Lock();
for (auto &overflow : this->overflow)
{
AU_LOCK_GUARD(overflow->mutex);
if (!callback(*overflow.get()))
{
bRetStatus = false;
break;
}
}
Unlock();
for (AU_ITERATE_N(i, kDefaultWaitPerProcess))
{
auto &entry = this->entries[i];
{
entry.uAtomic = 0;
}
}
return bRetStatus;
}
void ProcessWaitContainer::Lock()
{
while (AuAtomicTestAndSet(&this->uAtomic, 0))
{
while (this->uAtomic)
{
AuThreading::ContextYield();
}
}
}
void ProcessWaitContainer::Unlock()
{
this->uAtomic = 0;
}
void ProcessWaitContainer::Remove(WaitEntry *pParent)
{
Lock();
for (auto itr = this->overflow.begin();
itr != this->overflow.end();
)
{
if ((*itr).get() == pParent)
{
itr = this->overflow.erase(itr);
}
else
{
itr++;
}
}
Unlock();
}
AUKN_SYM bool IsWaitOnRecommended()
{
#if defined(AURORA_IS_MODERNNT_DERIVED)
return pWaitOnAddress &&
AuSwInfo::IsWindows8Point1OrGreater();
#elif defined(AURORA_PLATFORM_LINUX)
return true;
#endif
return false;
}
/// @deprecated
AUKN_SYM const AuList<AuUInt8> &GetValidWordSizes()
{
static const AuList<AuUInt8> kArray =
#if defined(AURORA_IS_MODERNNT_DERIVED)
{ 1, 2, 4, 8 };
#else
{ 4 };
#endif
return kArray;
}
bool WaitOnAddressWide(void *pTargetAddress,
void *pCompareAddress,
AuUInt8 uWordSize,
AuOptional<AuUInt64> qwNanoseconds,
bool bOSSupportsWait
)
{
WaitState state;
SysAssertDbg(uWordSize <= 8);
auto pWaitEntry = gProcessWaitables.WaitBufferFrom(pTargetAddress, uWordSize);
state.compare = WaitBuffer::From(pCompareAddress, uWordSize);
state.qwNanoseconds = qwNanoseconds ? AuOptionalEx<AuUInt64> { qwNanoseconds.value() } : AuOptionalEx<AuUInt64> {}; // from default/zeroable optional, to boolean suffix
auto bResult = pWaitEntry->SleepOn(state);
pWaitEntry->Release();
return bResult;
}
AuTuple<const void *, AuUInt8, AuOptionalEx<AuUInt32>> DecodeAddress(const void *pAddress,
AuUInt32 uWordSize)
{
#if defined(AURORA_IS_MODERNNT_DERIVED)
return AuMakeTuple(pAddress, 0, AuOptionalEx<AuUInt32> {});
#endif
if (uWordSize == 8)
{
return AuMakeTuple(pAddress, 0xFFFFFFFF, 0xFFFFFFFF);
}
auto pRounded = AuPageRound(AuUInt(pAddress), AuUInt(4));
auto uDelta = (AuUInt)(pAddress) - (AuUInt)(pRounded);
AuUInt32 uSizeMask = std::pow(AuUInt64(2), AuUInt64(uWordSize * 8)) - 1ull;
switch (uDelta)
{
case 0:
return AuMakeTuple(pAddress, 0, 0xFFFFFFFF & (uSizeMask << 0));
case 1:
return AuMakeTuple(pAddress, 1, 0xFFFFFF00 & (uSizeMask << 8));
case 2:
return AuMakeTuple(pAddress, 2, 0xFFFF0000 & (uSizeMask << 16));
case 3:
return AuMakeTuple(pAddress, 3, 0xFF000000 & (uSizeMask << 24));
default:
SysPanic("Invalid Branch");
}
}
static bool RunOSWaitOnAddressNoTimed(const void *pTargetAddress,
const void *pCompareAddress,
AuUInt8 dwWordSize)
{
#if defined(AURORA_IS_MODERNNT_DERIVED)
return pWaitOnAddress((void *)pTargetAddress, (void *)pCompareAddress, dwWordSize, 0);
#endif
#if defined(AURORA_IS_LINUX_DERIVED)
int ret {};
#if defined(AU_CPU_ENDIAN_BIG)
if (dwWordSize == 8)
{
pTargetAddress = AuReinterpretCast<const char *>(pTargetAddress) + 4;
pCompareAddress = AuReinterpretCast<const char *>(pCompareAddress) + 4;
}
#endif
auto uCurrent = *(AuUInt32 *)pCompareAddress;
do
{
ret = futex_wait((AuUInt32 *)pTargetAddress, uCurrent, nullptr);
if (ret == 0)
{
continue;
}
if (ret == EAGAIN || errno == EAGAIN)
{
continue;
}
if (ret == ETIMEDOUT || errno == ETIMEDOUT)
{
return false;
}
}
while (ret == EINTR);
return true;
#endif
return false;
}
static bool RunOSWaitOnAddressTimed(const void *pTargetAddress,
const void *pCompareAddress,
AuUInt8 uWordSize,
AuUInt64 uAbsTime,
AuUInt32 uNanoseconds)
{
#if defined(AURORA_IS_MODERNNT_DERIVED)
auto uMS = AuNSToMS<AuUInt32>(uNanoseconds);
if (!uMS)
{
auto expect = WaitBuffer::From(pCompareAddress, uWordSize);
do
{
if (expect.Compare(pTargetAddress))
{
AuThreading::ContextYield();
}
}
while (uAbsTime > AuTime::SteadyClockNS());
}
else
{
uMS = AuMin<AuUInt32>(uMS, 2000);
(void)pWaitOnAddress((void *)pTargetAddress, (void *)pCompareAddress, uWordSize, uMS);
if (!WaitBuffer::From(pCompareAddress, uWordSize).Compare(pTargetAddress))
{
return true;
}
AuUInt64 uNow {};
while (uAbsTime > (uNow = AuTime::SteadyClockNS()))
{
uMS = AuNSToMS<AuUInt32>(uAbsTime - uNow);
if (Primitives::DoTryIf([=]()
{
return !WaitBuffer::From(pCompareAddress, uWordSize).Compare(pTargetAddress);
}))
{
return true;
}
if (!uMS)
{
AuThreading::ContextYield();
}
else
{
(void)pWaitOnAddress((void *)pTargetAddress, (void *)pCompareAddress, uWordSize, uMS);
}
}
}
#endif
#if defined(AURORA_IS_LINUX_DERIVED)
int ret {};
#if defined(AU_CPU_ENDIAN_BIG)
if (uWordSize == 8)
{
pTargetAddress = AuReinterpretCast<const char *>(pTargetAddress) + 4;
pCompareAddress = AuReinterpretCast<const char *>(pCompareAddress) + 4;
}
#endif
auto uCurrent = *(AuUInt32 *)pCompareAddress;
struct timespec tspec;
Time::auabsns2ts(&tspec, uAbsTime);
do
{
ret = futex_wait((AuUInt32 *)pTargetAddress, uCurrent, &tspec);
if (ret == 0)
{
continue;
}
if (ret == EAGAIN || errno == EAGAIN)
{
continue;
}
if (ret == ETIMEDOUT || errno == ETIMEDOUT)
{
return false;
}
}
while (ret == EINTR);
#endif
return !WaitBuffer::From(pCompareAddress, uWordSize).Compare(pTargetAddress);
}
static void RunOSWaitOnAddressNoTimedNoErrors(const void *pTargetAddress,
const void *pCompareAddress,
WaitState &state)
{
while (WaitBuffer::From(pTargetAddress, state.uWordSize).Compare(state))
{
if (!RunOSWaitOnAddressNoTimed(pTargetAddress, pCompareAddress, state.uWordSize))
{
AuThreading::ContextYield();
}
}
}
static bool RunOSWaitOnAddressTimedNoErrors(const void *pTargetAddress,
const void *pCompareAddress,
WaitState &state)
{
if (!WaitBuffer::From(pTargetAddress, state.uWordSize).Compare(state))
{
return true;
}
(void)RunOSWaitOnAddressTimed(pTargetAddress, pCompareAddress, state.uWordSize, AuTime::SteadyClockNS() + state.qwNanoseconds.value(), state.qwNanoseconds.value());
return !WaitBuffer::From(pTargetAddress, state.uWordSize).Compare(state);
}
static void RunOSWakeNOnAddress(const void *pAddress,
AuUInt32 dwCount)
{
#if defined(AURORA_IS_LINUX_DERIVED)
futex_wake((AuUInt32 *)pAddress, dwCount);
#endif
#if defined(AURORA_IS_MODERNNT_DERIVED)
for (AuUInt i = 0; i < dwCount; i++)
{
pWakeByAddressSingle((void *)pAddress);
}
#endif
}
static void RunOSWakeAllOnAddress(const void *pAddress)
{
#if defined(AURORA_IS_LINUX_DERIVED)
futex_wake((AuUInt32 *)pAddress, INT_MAX);
#endif
#if defined(AURORA_IS_MODERNNT_DERIVED)
pWakeByAddressAll((void *)pAddress);
#endif
}
AUKN_SYM bool WaitOnAddress(void *pTargetAddress,
void *pCompareAddress,
AuUInt8 uWordSize,
AuUInt64 qwNanoseconds)
{
bool bWaitOnAddress = IsWaitOnRecommended();
if (bWaitOnAddress)
{
auto [pWaitAddress, uDelta, uMask] = DecodeAddress(pTargetAddress, uWordSize);
auto pCompareAddress2 = AuReinterpretCast<char *>(pCompareAddress) - uDelta;
WaitState state;
state.uDownsizeMask = uMask;
state.compare = uMask ?
WaitBuffer::From(pCompareAddress2, 4) :
WaitBuffer::From(pCompareAddress2, uWordSize);
state.uWordSize = uMask ? 4 : uWordSize;
if (!qwNanoseconds)
{
RunOSWaitOnAddressNoTimedNoErrors(pWaitAddress, pCompareAddress2, state);
return true;
}
else
{
state.qwNanoseconds = qwNanoseconds;
return RunOSWaitOnAddressTimedNoErrors(pWaitAddress, pCompareAddress2, state);
}
}
else
{
if (TryWaitOnAddress(pTargetAddress, pCompareAddress, uWordSize))
{
return true;
}
return WaitOnAddressWide(pTargetAddress, pCompareAddress, uWordSize, qwNanoseconds, false);
}
return false;
}
AUKN_SYM bool TryWaitOnAddress(void *pTargetAddress,
void *pCompareAddress,
AuUInt8 uWordSize)
{
return Primitives::DoTryIf([=]()
{
return !WaitBuffer::From(pCompareAddress, uWordSize).Compare(pTargetAddress);
});
}
AUKN_SYM void WakeNOnAddress(void *pTargetAddress,
AuUInt8 uNMaximumThreads)
{
if (IsWaitOnRecommended())
{
RunOSWakeNOnAddress(pTargetAddress, uNMaximumThreads);
}
else
{
(void)gProcessWaitables.IterateForceNoCreateDuringOp([&](WaitEntry &entry) -> bool
{
if (!uNMaximumThreads)
{
return false;
}
if (entry.TryWakeNoLockNoReallyNoLock(pTargetAddress))
{
uNMaximumThreads--;
}
return uNMaximumThreads != 0;
});
}
}
AUKN_SYM void WakeOnAddress(void *pTargetAddress)
{
WakeNOnAddress(pTargetAddress, 1);
}
AUKN_SYM void WakeAllOnAddress(void *pTargetAddress)
{
if (IsWaitOnRecommended())
{
RunOSWakeAllOnAddress(pTargetAddress);
}
else
{
(void)gProcessWaitables.IterateForceNoCreateDuringOp([=](WaitEntry &entry) -> bool
{
entry.TryWakeNoLockNoReallyNoLock(pTargetAddress);
return true;
});
}
}
}