/*** Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuWakeOnAddress.cpp Date: 2023-3-10 Author: Reece ***/ #include #include "AuWakeOnAddress.hpp" #include "Primitives/SMPYield.hpp" namespace Aurora::Threading /*::Primitives for this file only!*/ { #if defined(AURORA_IS_LINUX_DERIVED) static int futex_wait(uint32_t *addr, uint32_t expected, const struct timespec *timeout) { if (timeout) { return futex(addr, FUTEX_WAIT_BITSET, expected, timeout, 0, FUTEX_BITSET_MATCH_ANY); } else { return futex(addr, FUTEX_WAIT, expected, timeout, 0, 0); } } static int futex_wake(uint32_t *addr, uint32_t nthreads) { return futex(addr, FUTEX_WAKE, nthreads, 0, 0, 0); } #endif static ProcessWaitContainer gProcessWaitables; bool WaitEntry::TryAcquire(const void *pAddress, AuUInt8 uSize) { while (AuAtomicTestAndSet(&this->uAtomic, 0)) { while (this->uAtomic) { AuThreading::ContextYield(); } } //AU_LOCK_GUARD(this->mutex); if (this->pAddress) { this->uAtomic = 0; return false; } this->pAddress = pAddress; this->uSize = uSize; this->uAtomic = 0; return true; } void WaitEntry::Release() { if (this->bOverflow) { gProcessWaitables.Remove(this); this->bOverflow = false; } AuResetMember(this->uSize); AuResetMember(this->pAddress); } WaitEntry::WaitEntry() : variable(AuUnsafeRaiiToShared(&this->mutex)) { } WaitEntry::~WaitEntry() { this->Release(); } bool WaitEntry::SleepOn(WaitState &state) { AU_LOCK_GUARD(this->mutex); if (state.qwNanoseconds) { if (!WaitBuffer::From(this->pAddress, this->uSize).Compare(state)) { return true; } auto uNow = AuTime::SteadyClockNS(); auto uEndTime = uNow + state.qwNanoseconds.value(); #if defined(AURORA_IS_POSIX_DERIVED) struct timespec tspec; Time::auabsns2ts(&tspec, uEndTime); #endif while (uNow < uEndTime) { if (!WaitBuffer::From(this->pAddress, this->uSize).Compare(state)) { return true; } auto uTimeRemNS = uEndTime - uNow; auto uTimeRemMS = AuNSToMS(uTimeRemNS); #if defined(AURORA_IS_POSIX_DERIVED) auto pCond = reinterpret_cast(&this->variable.pthreadCv_); auto mutex = reinterpret_cast(this->mutex->GetOSHandle()); int ret {}; do { ret = ::pthread_cond_timedwait(pCond, mutex, &tspec); if (ret == 0) { break; } if (ret == ETIMEDOUT) { return !WaitBuffer::From(this->pAddress, this->uSize).Compare(state); } } while (ret == EINTR); #else if (!uTimeRemMS) { AuThreading::ContextYield(); uNow = AuTime::SteadyClockNS(); continue; } this->variable.WaitForSignal(uTimeRemMS); #endif uNow = AuTime::SteadyClockNS(); } return !WaitBuffer::From(this->pAddress, this->uSize).Compare(state); } else { while (WaitBuffer::From(this->pAddress, this->uSize).Compare(state)) { this->variable.WaitForSignal(0); } return true; } return false; } bool WaitEntry::TryWake(const void *pAddress) { while (AuAtomicTestAndSet(&this->uAtomic, 0)) { while (this->uAtomic) { AuThreading::ContextYield(); } } auto bRet = TryWakeNoLock(pAddress); if (!bRet) { this->uAtomic = 0; } return bRet; } bool WaitEntry::TryWakeNoLockNoReallyNoLock(const void *pAddress) { if (AuReinterpretCast(this->pAddress) > AuReinterpretCast(pAddress) || AuReinterpretCast(this->pAddress) + this->uSize <= AuReinterpretCast(pAddress)) { return false; } this->uAtomic = 0; this->variable.Signal(); return true; } bool WaitEntry::TryWakeNoLock(const void *pAddress) { if (AuReinterpretCast(this->pAddress) > AuReinterpretCast(pAddress) || AuReinterpretCast(this->pAddress) + this->uSize <= AuReinterpretCast(pAddress)) { return false; } AU_LOCK_GUARD(this->mutex); this->uAtomic = 0; this->variable.Signal(); return true; } WaitBuffer WaitBuffer::From(const void *pBuf, AuUInt8 uSize) { WaitBuffer wait; AuMemcpy(wait.buffer, pBuf, uSize); wait.uSize = uSize; return AuMove(wait); } bool WaitBuffer::Compare(const void *pBuf) { return AuMemcmp(this->buffer, pBuf, this->uSize) == 0; } bool WaitBuffer::Compare(WaitState &state) { if (!state.uDownsizeMask) { return AuMemcmp(this->buffer, state.compare.buffer, AuMin(this->uSize, state.compare.uSize)) == 0; } else { auto uMask = state.uDownsizeMask.value(); auto &uSrcWord = *AuReinterpretCast(this->buffer); auto &uCmpWord = *AuReinterpretCast(state.compare.buffer); return (uSrcWord & uMask) == (uCmpWord & uMask); } } AuSPtr ProcessWaitContainer::WaitBufferFrom(void *pAddress, AuUInt8 uSize) { for (AU_ITERATE_N(i, kDefaultWaitPerProcess)) { if (this->entries[i].TryAcquire(pAddress, uSize)) { return AuUnsafeRaiiToShared(&this->entries[i]); } } AuSPtr pNew; { AuDebug::AddMemoryCrunch(); pNew = AuMakeSharedPanic(); AuDebug::DecMemoryCrunch(); } { Lock(); this->overflow.push_back(pNew); Unlock(); } return pNew; } template bool ProcessWaitContainer::IterateAll(T callback) { for (AU_ITERATE_N(i, kDefaultWaitPerProcess)) { auto &entry = this->entries[i]; { while (AuAtomicTestAndSet(&entry.uAtomic, 0)) { while (entry.uAtomic) { AuThreading::ContextYield(); } } if (!entry.pAddress) { entry.uAtomic = 0; } else { AU_LOCK_GUARD(entry.mutex); entry.uAtomic = 0; if (!callback(entry)) { return false; } } } } Lock(); for (auto &overflow : this->overflow) { AU_LOCK_GUARD(overflow->mutex); if (!callback(*overflow.get())) { return false; } } Unlock(); return true; } template bool ProcessWaitContainer::IterateForceNoCreateDuringOp(T callback) { bool bRetStatus { true }; for (AU_ITERATE_N(i, kDefaultWaitPerProcess)) { auto &entry = this->entries[i]; { while (AuAtomicTestAndSet(&entry.uAtomic, 0)) { while (entry.uAtomic) { AuThreading::ContextYield(); } } if (entry.pAddress) { AU_LOCK_GUARD(entry.mutex); if (!callback(entry)) { for (AU_ITERATE_N(z, i + 1)) { this->entries[z].uAtomic = 0; } return false; } } } } Lock(); for (auto &overflow : this->overflow) { AU_LOCK_GUARD(overflow->mutex); if (!callback(*overflow.get())) { bRetStatus = false; break; } } Unlock(); for (AU_ITERATE_N(i, kDefaultWaitPerProcess)) { auto &entry = this->entries[i]; { entry.uAtomic = 0; } } return bRetStatus; } void ProcessWaitContainer::Lock() { while (AuAtomicTestAndSet(&this->uAtomic, 0)) { while (this->uAtomic) { AuThreading::ContextYield(); } } } void ProcessWaitContainer::Unlock() { this->uAtomic = 0; } void ProcessWaitContainer::Remove(WaitEntry *pParent) { Lock(); for (auto itr = this->overflow.begin(); itr != this->overflow.end(); ) { if ((*itr).get() == pParent) { itr = this->overflow.erase(itr); } else { itr++; } } Unlock(); } AUKN_SYM bool IsWaitOnRecommended() { #if defined(AURORA_IS_MODERNNT_DERIVED) return pWaitOnAddress && AuSwInfo::IsWindows8Point1OrGreater(); #elif defined(AURORA_PLATFORM_LINUX) return true; #endif return false; } /// @deprecated AUKN_SYM const AuList &GetValidWordSizes() { static const AuList kArray = #if defined(AURORA_IS_MODERNNT_DERIVED) { 1, 2, 4, 8 }; #else { 4 }; #endif return kArray; } bool WaitOnAddressWide(void *pTargetAddress, void *pCompareAddress, AuUInt8 uWordSize, AuOptional qwNanoseconds, bool bOSSupportsWait ) { WaitState state; SysAssertDbg(uWordSize < 8); auto pWaitEntry = gProcessWaitables.WaitBufferFrom(pTargetAddress, uWordSize); state.compare = WaitBuffer::From(pCompareAddress, uWordSize); state.qwNanoseconds = qwNanoseconds ? AuOptionalEx { qwNanoseconds.value() } : AuOptionalEx {}; // from default/zeroable optional, to boolean suffix auto bResult = pWaitEntry->SleepOn(state); pWaitEntry->Release(); return bResult; } AuTuple> DecodeAddress(const void *pAddress, AuUInt32 uSizeMask) { #if defined(AURORA_IS_MODERNNT_DERIVED) return AuMakeTuple(pAddress, 0, AuOptionalEx {}); #endif auto pRounded = AuPageRound(AuUInt(pAddress), AuUInt(4)); auto uDelta = (AuUInt)(pAddress) - (AuUInt)(pRounded); switch (uDelta) { case 0: return AuMakeTuple(pAddress, 0, 0xFFFFFFFF & (uSizeMask << 0)); case 1: return AuMakeTuple(pAddress, 1, 0xFFFFFF00 & (uSizeMask << 8)); case 2: return AuMakeTuple(pAddress, 2, 0xFFFF0000 & (uSizeMask << 16)); case 3: return AuMakeTuple(pAddress, 3, 0xFF000000 & (uSizeMask << 24)); default: SysPanic("Invalid Branch"); } } static bool RunOSWaitOnAddressNoTimed(const void *pTargetAddress, const void *pCompareAddress, AuUInt8 dwWordSize) { #if defined(AURORA_IS_MODERNNT_DERIVED) return pWaitOnAddress((void *)pTargetAddress, (void *)pCompareAddress, dwWordSize, 0); #endif #if defined(AURORA_IS_LINUX_DERIVED) int ret {}; #if defined(AU_CPU_ENDIAN_LITTLE) if (dwWordSize == 8) { pTargetAddress = AuReinterpretCast(pTargetAddress) + 4; pCompareAddress = AuReinterpretCast(pCompareAddress) + 4; } #endif auto uCurrent = *(AuUInt32 *)pCompareAddress; do { ret = futex_wait((AuUInt32 *)pTargetAddress, uCurrent, nullptr); if (ret == 0) { continue; } if (ret == EAGAIN || errno == EAGAIN) { continue; } if (ret == ETIMEDOUT || errno == ETIMEDOUT) { return false; } } while (ret == EINTR); return true; #endif return false; } static bool RunOSWaitOnAddressTimed(const void *pTargetAddress, const void *pCompareAddress, AuUInt8 uWordSize, AuUInt64 uAbsTime, AuUInt32 uNanoseconds) { #if defined(AURORA_IS_MODERNNT_DERIVED) auto uMS = AuNSToMS(uNanoseconds); if (!uMS) { auto expect = WaitBuffer::From(pCompareAddress, uWordSize); do { if (expect.Compare(pTargetAddress)) { AuThreading::ContextYield(); } } while (uAbsTime > AuTime::SteadyClockNS()); } else { uMS = AuMin(uMS, 2000); (void)pWaitOnAddress((void *)pTargetAddress, (void *)pCompareAddress, uWordSize, uMS); if (!WaitBuffer::From(pCompareAddress, uWordSize).Compare(pTargetAddress)) { return true; } AuUInt64 uNow {}; while (uAbsTime > (uNow = AuTime::SteadyClockNS())) { uMS = AuNSToMS(uAbsTime - uNow); if (Primitives::DoTryIf([=]() { return !WaitBuffer::From(pCompareAddress, uWordSize).Compare(pTargetAddress); })) { return true; } if (!uMS) { AuThreading::ContextYield(); } else { (void)pWaitOnAddress((void *)pTargetAddress, (void *)pCompareAddress, uWordSize, uMS); } } } #endif #if defined(AURORA_IS_LINUX_DERIVED) int ret {}; #if defined(AU_CPU_ENDIAN_LITTLE) if (uWordSize == 8) { pTargetAddress = AuReinterpretCast(pTargetAddress) + 4; pCompareAddress = AuReinterpretCast(pCompareAddress) + 4; } #endif auto uCurrent = *(AuUInt32 *)pCompareAddress; struct timespec tspec; Time::auabsns2ts(&tspec, uAbsTime); do { ret = futex_wait((AuUInt32 *)pTargetAddress, uCurrent, &tspec); if (ret == 0) { continue; } if (ret == EAGAIN || errno == EAGAIN) { continue; } if (ret == ETIMEDOUT || errno == ETIMEDOUT) { return false; } } while (ret == EINTR); #endif return !WaitBuffer::From(pCompareAddress, uWordSize).Compare(pTargetAddress); } static void RunOSWaitOnAddressNoTimedNoErrors(const void *pTargetAddress, const void *pCompareAddress, WaitState &state) { while (WaitBuffer::From(pTargetAddress, state.uWordSize).Compare(state)) { if (!RunOSWaitOnAddressNoTimed(pTargetAddress, pCompareAddress, state.uWordSize)) { AuThreading::ContextYield(); } } } static bool RunOSWaitOnAddressTimedNoErrors(const void *pTargetAddress, const void *pCompareAddress, WaitState &state) { if (!WaitBuffer::From(pTargetAddress, state.uWordSize).Compare(state)) { return true; } (void)RunOSWaitOnAddressTimed(pTargetAddress, pCompareAddress, state.uWordSize, AuTime::SteadyClockNS() + state.qwNanoseconds.value(), state.qwNanoseconds.value()); return !WaitBuffer::From(pTargetAddress, state.uWordSize).Compare(state); } static void RunOSWakeNOnAddress(const void *pAddress, AuUInt32 dwCount) { #if defined(AURORA_IS_LINUX_DERIVED) futex_wake((AuUInt32 *)pAddress, dwCount); #endif #if defined(AURORA_IS_MODERNNT_DERIVED) for (AuUInt i = 0; i < dwCount; i++) { pWakeByAddressSingle((void *)pAddress); } #endif } static void RunOSWakeAllOnAddress(const void *pAddress) { #if defined(AURORA_IS_LINUX_DERIVED) futex_wake((AuUInt32 *)pAddress, INT_MAX); #endif #if defined(AURORA_IS_MODERNNT_DERIVED) pWakeByAddressAll((void *)pAddress); #endif } AUKN_SYM bool WaitOnAddress(void *pTargetAddress, void *pCompareAddress, AuUInt8 uWordSize, AuUInt64 qwNanoseconds) { bool bWaitOnAddress = IsWaitOnRecommended(); if (bWaitOnAddress) { auto [pWaitAddress, uDelta, uMask] = DecodeAddress(pTargetAddress, uWordSize); auto pCompareAddress2 = AuReinterpretCast(pCompareAddress) - uDelta; WaitState state; state.uDownsizeMask = uMask; state.compare = uMask ? WaitBuffer::From(pCompareAddress2, 4) : WaitBuffer::From(pCompareAddress2, uWordSize); state.uWordSize = uMask ? 4 : uWordSize; if (!qwNanoseconds) { RunOSWaitOnAddressNoTimedNoErrors(pWaitAddress, pCompareAddress2, state); return true; } else { state.qwNanoseconds = qwNanoseconds; return RunOSWaitOnAddressTimedNoErrors(pWaitAddress, pCompareAddress2, state); } } else { return WaitOnAddressWide(pTargetAddress, pCompareAddress, uWordSize, qwNanoseconds, false); } return false; } AUKN_SYM bool TryWaitOnAddress(void *pTargetAddress, void *pCompareAddress, AuUInt8 uWordSize) { return Primitives::DoTryIf([=]() { return !WaitBuffer::From(pCompareAddress, uWordSize).Compare(pTargetAddress); }); } AUKN_SYM void WakeNOnAddress(void *pTargetAddress, AuUInt8 uNMaximumThreads) { if (IsWaitOnRecommended()) { RunOSWakeNOnAddress(pTargetAddress, uNMaximumThreads); } else { (void)gProcessWaitables.IterateForceNoCreateDuringOp([&](WaitEntry &entry) -> bool { if (!uNMaximumThreads) { return false; } if (entry.TryWakeNoLockNoReallyNoLock(pTargetAddress)) { uNMaximumThreads--; } return uNMaximumThreads != 0; }); } } AUKN_SYM void WakeOnAddress(void *pTargetAddress) { WakeNOnAddress(pTargetAddress, 1); } AUKN_SYM void WakeAllOnAddress(void *pTargetAddress) { if (IsWaitOnRecommended()) { RunOSWakeAllOnAddress(pTargetAddress); } else { (void)gProcessWaitables.IterateForceNoCreateDuringOp([=](WaitEntry &entry) -> bool { entry.TryWakeNoLockNoReallyNoLock(pTargetAddress); return true; }); } } }