AuroraRuntime/Source/Threading/Primitives/AuConditionVariable.NT.cpp

517 lines
16 KiB
C++

/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuConditionVariable.Win32.cpp
Date: 2021-6-12
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuConditionVariable.Generic.hpp"
#include <Time/Time.hpp>
#include "SMTYield.hpp"
#include "../AuWakeInternal.hpp"
#if !defined(_AURUNTIME_GENERICCV)
#if !defined(NTSTATUS_TIMEOUT)
#define NTSTATUS_TIMEOUT 0x102
#endif
namespace Aurora::Threading::Primitives
{
ConditionVariableNT::ConditionVariableNT()
{
#if defined(AURORA_FORCE_SRW_LOCKS)
::InitializeConditionVariable(&this->winCond_);
#endif
}
bool ConditionVariableNT::WaitForSignalNsEx(Win32ConditionMutex *pMutex, AuUInt64 qwTimeout)
{
#if !defined(AURORA_FORCE_SRW_LOCKS)
bool bRet { true };
auto pThatMutex = reinterpret_cast<NT4Mutex *>(&pMutex->lock_);
if (qwTimeout)
{
//#if defined(AU_TRUST_NT_KERNEL_SCHED_TIMEOUT)
auto uEndTimeSteady = gUseNativeWaitCondvar ? AuTime::SteadyClockNS() + qwTimeout : 0; // we could nuke this again, if i really wanted to
// #endif
auto uEndTimeWall = AuTime::CurrentClockNS() + qwTimeout;
auto uTargetTimeNt = AuTime::ConvertTimestampNs(uEndTimeWall);
bool bIOU {};
while (true)
{
LARGE_INTEGER word;
word.QuadPart = uTargetTimeNt;
if (bRet)
{
while (true)
{
auto uNow = this->wlist;
auto waiting = uNow >> kShiftCountByBits;
auto uNext = ((waiting + 1) << kShiftCountByBits) | (!bool(waiting)) | (uNow & 1);
if (AuAtomicCompareExchange(&this->wlist, uNext, uNow) == uNow)
{
break;
}
}
pMutex->Unlock();
if (gUseNativeWaitCondvar)
{
// Reverted: 5b495f7fd9495aa55395666e166ac499955215dc
if (ThrdCfg::gPreferNtCondvarModernWinSpin)
{
if (!bIOU)
{
#if 0
bIOU = CheckOut();
#else
if (CheckOut())
{
pMutex->Lock();
return true;
}
#endif
}
}
AuUInt8 uBlockBit { 1 };
bRet = InternalLTSWaitOnAddressHighRes(&this->wlist, &uBlockBit, 1, uEndTimeSteady);
}
else
{
// Reverted: 5b495f7fd9495aa55395666e166ac499955215dc
if (ThrdCfg::gPreferNtCondvarOlderWinSpin)
{
if (!bIOU)
{
bIOU = CheckOut();
}
}
bRet = pNtWaitForKeyedEvent(gKeyedEventHandle, (void *)&this->wlist, 0, &word) != NTSTATUS_TIMEOUT;
}
pMutex->Lock();
}
else /* unblock NtReleaseKeyedEvent after an atomic this->wlist change <-> NtReleaseKeyedEvent race condition. */
{ /* this->wlist waiters should still be accounting for us, leading to a NtReleaseKeyedEvent block condition*/
LARGE_INTEGER word;
word.QuadPart = 0;
if (gUseNativeWaitCondvar)
{
pMutex->Unlock();
// do not do a syscall (worst case) under ::unlock(...) barrier/pMutex's lock
// best case: rdtsc extrapolation, if nts hal trusts our system
if (uEndTimeSteady <= AuTime::SteadyClockNS())
{
pMutex->Lock();
return bIOU;
}
AuUInt8 uBlockBit { 1 };
InternalLTSWakeAll((void *)&this->wlist); // this is kinda sad
bRet = InternalLTSWaitOnAddressHighRes(&this->wlist, &uBlockBit, 1, uEndTimeSteady); // why?
}
else
{
pMutex->Unlock();
// Obligatory Windows XP+ resched
bRet = pNtWaitForKeyedEvent(gKeyedEventHandle, (void *)&this->wlist, 0, nullptr) != NTSTATUS_TIMEOUT;
}
pMutex->Lock();
}
#if !defined(AU_TRUST_NT_KERNEL_SCHED_TIMEOUT)
if (!bRet)
#else
if (!bRet && uEndTimeSteady <= AuTime::SteadyClockNS())
#endif
{
auto uNow = this->wlist;
auto uOld = (uNow >> kShiftCountByBits);
if (uOld == 0)
{
// broadcast has woken everyone up
if (bIOU || CheckOut()) // the cope acquire
{
// in which case we're good
return true;
}
else
{
// ...and now we might owe NtReleaseKeyedEvent a thread >:(
bRet = false;
continue;
}
}
// go for an atomic decrement while racing against ::Signal and ::Broadcast
auto waiting = uOld - 1u;
auto uNext = waiting << kShiftCountByBits;
if (AuAtomicCompareExchange(&this->wlist, uNext, uNow) == uNow)
{
// break if successful
return bIOU;
}
else
{
// block again because we couldn't decrement the counter
// broadcast still thinks we're asleep
// ...and we still owe NtReleaseKeyedEvent 1 therad
continue;
}
}
else
{
// we good?
if (bIOU || CheckOut())
{
return true;
}
}
}
}
else
{
bool bIOU {};
while (true)
{
while (true)
{
auto uNow = this->wlist;
auto waiting = uNow >> kShiftCountByBits;
auto uNext = ((waiting + 1) << kShiftCountByBits) | (!bool(waiting)) | (uNow & 1);
if (AuAtomicCompareExchange(&this->wlist, uNext, uNow) == uNow)
{
break;
}
}
pMutex->Unlock();
if (gUseNativeWaitCondvar)
{
if (ThrdCfg::gPreferNtCondvarModernWinSpin)
{
if (!bIOU)
{
if (CheckOut())
{
pMutex->Lock();
return true;
}
}
}
AuUInt8 uBlockBit { 1 };
bRet = InternalLTSWaitOnAddressHighRes(&this->wlist, &uBlockBit, 1, 0);
}
else
{
if (ThrdCfg::gPreferNtCondvarOlderWinSpin)
{
if (!bIOU)
{
bIOU = CheckOut();
}
}
pNtWaitForKeyedEvent(gKeyedEventHandle, (void *)&this->wlist, 0, nullptr);
}
pMutex->Lock();
if (bIOU || CheckOut())
{
return bRet;
}
}
}
#else
AuInt64 uEndTime = qwTimeout ? AuTime::SteadyClockNS() + qwTimeout : 0;
while (true)
{
AuInt64 uSecondTimeout = INFINITE;
if (qwTimeout)
{
uSecondTimeout = uEndTime - AuTime::SteadyClockNS();
if (uSecondTimeout <= 0)
{
return false;
}
uSecondTimeout = AuNSToMS<AuUInt64>(uSecondTimeout);
if (!uSecondTimeout)
{
SMPPause();
AuThreading::ContextYield();
continue;
}
}
auto bOk = ::SleepConditionVariableSRW(&this->winCond_, reinterpret_cast<PSRWLOCK>(pMutex->GetOSHandle()), uSecondTimeout, 0);
if (bOk)
{
return true;
}
else
{
SysAssert(GetLastError() == ERROR_TIMEOUT, "SleepConditionVariable failure");
}
}
#endif
}
bool ConditionVariableNT::CheckOut()
{
#if defined(AURORA_FORCE_SRW_LOCKS)
return false;
#else
return DoTryIf([&]()
{
auto uSignalNow = this->signalCount;
if (uSignalNow == 0)
{
return false;
}
auto uSignalNext = uSignalNow - 1;
if (AuAtomicCompareExchange(&this->signalCount, uSignalNext, uSignalNow) != uSignalNow)
{
return false;
}
if constexpr (kBoolRequiredLateSet)
{
if (uSignalNext == 0)
{
InterlockedOr((volatile LONG *)&this->wlist, 1);
}
}
return true;
});
#endif
}
void ConditionVariableNT::Signal()
{
#if !defined(AURORA_FORCE_SRW_LOCKS)
auto original = this->wlist;
auto expected = original;
expected = expected >> kShiftCountByBits;
if (expected)
{
AuAtomicAdd(&this->signalCount, 1u);
while (expected)
{
if (AuAtomicCompareExchange(&this->wlist, ((expected - 1) << kShiftCountByBits) /*intentional clear*/, original) == original)
{
if (gUseNativeWaitCondvar)
{
InternalLTSWakeOne((void *)&this->wlist);
}
else
{
pNtReleaseKeyedEvent(gKeyedEventHandle, (void *)&this->wlist, FALSE, nullptr);
}
return;
}
original = this->wlist;
expected = original >> kShiftCountByBits;
}
}
#else
::WakeConditionVariable(&this->winCond_);
#endif
}
void ConditionVariableNT::Broadcast()
{
#if !defined(AURORA_FORCE_SRW_LOCKS)
if (gUseNativeWaitCondvar)
{
auto original = this->wlist;
auto expected = original;
expected = expected >> kShiftCountByBits;
if (!expected)
{
return;
}
AuAtomicAdd(&this->signalCount, expected);
auto uAwoken = expected;
while (true)
{
if (AuAtomicCompareExchange(&this->wlist,
expected - uAwoken,
original) == original)
{
InternalLTSWakeAll((void *)&this->wlist);
return;
}
else
{
original = this->wlist;
expected = original >> kShiftCountByBits;
uAwoken = AuMin<AuUInt32>(uAwoken, expected);
}
}
}
else
{
auto original = this->wlist;
auto expected = original;
expected = expected >> kShiftCountByBits;
auto uBroadcastIterations = expected;
while (expected && uBroadcastIterations)
{
AuAtomicAdd(&this->signalCount, 1u);
while (expected && uBroadcastIterations)
{
bool bBreak {};
if (AuAtomicCompareExchange(&this->wlist, ((expected - 1) << kShiftCountByBits) /*intentional clear*/, original) == original)
{
pNtReleaseKeyedEvent(gKeyedEventHandle, (void *)&this->wlist, FALSE, nullptr);
uBroadcastIterations--;
bBreak = true;
}
original = this->wlist;
expected = original >> kShiftCountByBits;
if (bBreak)
{
break;
}
}
}
}
#else
::WakeAllConditionVariable(&this->winCond_);
#endif
}
void ConditionVariableNT::BroadcastN(AuUInt32 nBroadcast)
{
#if !defined(AURORA_FORCE_SRW_LOCKS)
if (gUseNativeWaitCondvar)
{
auto original = this->wlist;
auto expected = original;
expected = AuMin(nBroadcast, expected >> kShiftCountByBits);
if (!expected)
{
return;
}
AuAtomicAdd(&this->signalCount, expected);
auto uAwoken = expected;
while (true)
{
auto uCount = expected - uAwoken;
if (AuAtomicCompareExchange(&this->wlist,
uCount,
original) == original)
{
InternalLTSWakeCount((void *)&this->wlist, uCount);
return;
}
else
{
original = this->wlist;
expected = original >> kShiftCountByBits;
uAwoken = AuMin<AuUInt32>(uAwoken, expected);
}
}
}
else
{
auto original = this->wlist;
auto expected = original;
expected = expected >> kShiftCountByBits;
auto uBroadcastIterations = AuMin(nBroadcast, expected);
while (expected && uBroadcastIterations)
{
AuAtomicAdd(&this->signalCount, 1u);
while (expected && uBroadcastIterations)
{
bool bBreak {};
if (AuAtomicCompareExchange(&this->wlist, ((expected - 1) << kShiftCountByBits) /*intentional clear*/, original) == original)
{
pNtReleaseKeyedEvent(gKeyedEventHandle, (void *)&this->wlist, FALSE, nullptr);
uBroadcastIterations--;
bBreak = true;
}
original = this->wlist;
expected = original >> kShiftCountByBits;
if (bBreak)
{
break;
}
}
}
}
#else
::WakeAllConditionVariable(&this->winCond_);
#endif
}
AUKN_SYM IConditionVariable *ConditionVariableNew(const AuSPtr<IConditionMutex> &pMutex)
{
return _new ConditionVariableImpl(pMutex);
}
AUKN_SYM void ConditionVariableRelease(IConditionVariable *pCV)
{
AuSafeDelete<ConditionVariableImpl *>(pCV);
}
AUROXTL_INTERFACE_SOO_SRC(ConditionVariable, ConditionVariableImpl, (const AuSPtr<IConditionMutex> &, pMutex))
}
#endif