[*] RWLock: WakeOnAddress optimization on wait to prevent mutex congestion on modern OSes

This commit is contained in:
Reece Wilson 2023-08-23 14:52:47 +01:00
parent 30fc75cbab
commit d79cb4f3ca
6 changed files with 426 additions and 142 deletions

View File

@ -371,6 +371,7 @@ namespace Aurora
AuUInt64 uAdaptiveSpinCUCnt4 : 4 { 2 };
AuUInt64 uAdaptiveSpinCUCnt8 : 4 { 3 };
AuUInt64 uAdaptiveSpinCUCnt16 : 4 { 4 };
AuUInt64 bPreferFutexRWLock : 1 { true };
};
struct DummyConfig

View File

@ -29,7 +29,7 @@ namespace Aurora::Threading::Primitives
* @brief Allows for read-to-write upgrades when the decision to esclate is made based upon a shared resource
* which would be lost by unlocking and relocking in exclusive mode.
* You must be careful to consider how this is used to prevent dead-locks
* @param timeout
* @param timeout in relative nanoseconds
* @return
*/
virtual bool UpgradeReadToWrite(AuUInt64 timeout) = 0;

View File

@ -18,6 +18,12 @@ namespace Aurora::Threading::Primitives
#define ViewParent ((T *)(((char *)this) - (bIsReadView ? RWLockImpl<true>::kOffsetOfRead : RWLockImpl<true>::kOffsetOfWrite)))
#endif
#if defined(AURORA_COMPILER_MSVC)
#define RWLOCK_REORDER_BARRIER() ::MemoryBarrier();
#else
#define RWLOCK_REORDER_BARRIER()
#endif
static const auto kRWThreadWriterHardContextSwitchBias = 15;
template<bool bIsReadView, typename T>
@ -137,6 +143,18 @@ namespace Aurora::Threading::Primitives
#endif
}
template<bool bIsWriteRecursionAllowed>
AuUInt32 *RWLockImpl<bIsWriteRecursionAllowed>::GetFutexCondition()
{
return (AuUInt32 *)&this->state_;
}
template<bool bIsWriteRecursionAllowed>
AuUInt32 *RWLockImpl<bIsWriteRecursionAllowed>::GetFutexConditionWriter()
{
return (AuUInt32 *)this->conditionVariableWriter_;
}
template<bool bIsWriteRecursionAllowed>
bool RWLockImpl<bIsWriteRecursionAllowed>::LockReadNSAbs(AuUInt64 uTimeout)
{
@ -152,31 +170,41 @@ namespace Aurora::Threading::Primitives
if (iCurState < 0)
{
AU_LOCK_GUARD(this->mutex_);
iCurState = this->state_;
if (iCurState < 0)
if (gUseFutexRWLock)
{
AuInt64 iSecondTimeout {};
if (uTimeout)
if (!WaitOnAddressSteady((const void *)&this->state_, &iCurState, sizeof(iCurState), uTimeout))
{
iSecondTimeout = AuInt64(uTimeout) - AuTime::SteadyClockNS();
return false;
}
}
else
{
AU_LOCK_GUARD(this->mutex_);
if (iSecondTimeout <= 0)
iCurState = this->state_;
if (iCurState < 0)
{
AuInt64 iSecondTimeout {};
if (uTimeout)
{
iSecondTimeout = AuInt64(uTimeout) - AuTime::SteadyClockNS();
if (iSecondTimeout <= 0)
{
return false;
}
}
#if defined(AURWLOCK_NO_SIZE_OPTIMIZED_CONDVAR)
if (!this->GetCondition().WaitForSignalNS(iSecondTimeout))
#else
if (!this->GetCondition().WaitForSignalNsEx(&this->mutex_, iSecondTimeout))
#endif
{
return false;
}
}
#if defined(AURWLOCK_NO_SIZE_OPTIMIZED_CONDVAR)
if (!this->GetCondition().WaitForSignalNS(iSecondTimeout))
#else
if (!this->GetCondition().WaitForSignalNsEx(&this->mutex_, iSecondTimeout))
#endif
{
return false;
}
}
}
}
@ -208,31 +236,41 @@ namespace Aurora::Threading::Primitives
if (iCurState < 0)
{
AU_LOCK_GUARD(this->mutex_);
iCurState = this->state_;
if (iCurState < 0)
if (gUseFutexRWLock)
{
AuInt64 iSecondTimeout {};
if (uTimeout)
if (!WaitOnAddressSteady((const void *)&this->state_, &iCurState, sizeof(iCurState), uEndTime))
{
iSecondTimeout = uEndTime - AuTime::SteadyClockNS();
return false;
}
}
else
{
AU_LOCK_GUARD(this->mutex_);
if (iSecondTimeout <= 0)
iCurState = this->state_;
if (iCurState < 0)
{
AuInt64 iSecondTimeout {};
if (uTimeout)
{
iSecondTimeout = uEndTime - AuTime::SteadyClockNS();
if (iSecondTimeout <= 0)
{
return false;
}
}
#if defined(AURWLOCK_NO_SIZE_OPTIMIZED_CONDVAR)
if (!this->GetCondition().WaitForSignalNS(iSecondTimeout))
#else
if (!this->GetCondition().WaitForSignalNsEx(&this->mutex_, iSecondTimeout))
#endif
{
return false;
}
}
#if defined(AURWLOCK_NO_SIZE_OPTIMIZED_CONDVAR)
if (!this->GetCondition().WaitForSignalNS(iSecondTimeout))
#else
if (!this->GetCondition().WaitForSignalNsEx(&this->mutex_, iSecondTimeout))
#endif
{
return false;
}
}
}
}
@ -254,75 +292,58 @@ namespace Aurora::Threading::Primitives
}
else
{
auto uOld = this->state_;
if (uOld < 0)
if (gUseFutexRWLock)
{
if (this->reentrantWriteLockHandle_ == GetThreadCookie())
if (DoTryIf([=]()
{
return this->LockWriteNSAbsSecondPath();
}))
{
AuAtomicSub(&this->state_, 1);
return true;
}
}
else if (uOld == 0)
else
{
if (AuAtomicCompareExchange(&this->state_, -1, uOld) == uOld)
if (this->LockWriteNSAbsSecondPath())
{
this->reentrantWriteLockHandle_ = GetThreadCookie();
return true;
}
}
}
AU_LOCK_GUARD(this->mutex_);
AuAtomicAdd(&this->writersPending_, 1);
while (true)
if (gUseFutexRWLock)
{
while (this->state_ != 0)
return this->LockWriteNSAbsUnlocked(uTimeout);
}
else
{
AU_LOCK_GUARD(this->mutex_);
return this->LockWriteNSAbsUnlocked(uTimeout);
}
}
template<bool bIsWriteRecursionAllowed>
bool RWLockImpl<bIsWriteRecursionAllowed>::LockWriteNSAbsSecondPath()
{
auto uOld = this->state_;
if (uOld < 0)
{
if (this->reentrantWriteLockHandle_ == GetThreadCookie())
{
AuInt64 iSecondTimeout = 0;
if (uTimeout)
{
iSecondTimeout = AuInt64(uTimeout) - AuTime::SteadyClockNS();
if (iSecondTimeout <= 0)
{
AuAtomicSub(&this->writersPending_, 1);
return false;
}
}
#if defined(AURWLOCK_NO_SIZE_OPTIMIZED_CONDVAR)
bool bStatus = this->GetConditionWriter().WaitForSignalNS(iSecondTimeout);
#else
bool bStatus = this->GetConditionWriter().WaitForSignalNsEx(&this->mutex_, iSecondTimeout);
#endif
if constexpr (bIsWriteRecursionAllowed)
{
if (this->state_ == 1)
{
this->GetConditionWriter().Broadcast();
}
}
if (!bStatus)
{
AuAtomicSub(&this->writersPending_, 1);
return false;
}
AuAtomicSub(&this->state_, 1);
return true;
}
if (AuAtomicCompareExchange(&this->state_, -1, 0) == 0)
}
else if (uOld == 0)
{
if (AuAtomicCompareExchange(&this->state_, -1, uOld) == uOld)
{
this->reentrantWriteLockHandle_ = GetThreadCookie();
AuAtomicSub(&this->writersPending_, 1);
return true;
}
}
return true;
return false;
}
template<bool bIsWriteRecursionAllowed>
@ -356,45 +377,110 @@ namespace Aurora::Threading::Primitives
}
}
AU_LOCK_GUARD(this->mutex_);
AuAtomicAdd(&this->writersPending_, 1);
if (gUseFutexRWLock)
{
AuInt64 uEndTime = uTimeout ? AuTime::SteadyClockNS() + uTimeout : 0;
AuInt64 uEndTime = uTimeout ? AuTime::SteadyClockNS() + uTimeout : 0;
return this->LockWriteNSAbsUnlocked(uEndTime);
}
else
{
AU_LOCK_GUARD(this->mutex_);
AuInt64 uEndTime = uTimeout ? AuTime::SteadyClockNS() + uTimeout : 0;
return this->LockWriteNSAbsUnlocked(uEndTime);
}
}
template<bool bIsWriteRecursionAllowed>
bool RWLockImpl<bIsWriteRecursionAllowed>::LockWriteNSAbsUnlocked(AuUInt64 qwTimeoutNS)
{
while (true)
{
while (this->state_ != 0)
AuInt32 iCurState;
while ((iCurState = this->state_) != 0)
{
AuInt64 uSecondTimeout = 0;
if (uTimeout)
bool bStatus {};
if (gUseFutexRWLock)
{
uSecondTimeout = uEndTime - AuTime::SteadyClockNS();
auto pSemaphore = this->GetFutexConditionWriter();
if (uSecondTimeout <= 0)
AuInt32 iCurState;
while ((iCurState = this->state_) != 0)
{
bool bStatusTwo {};
AuAtomicAdd(&this->writersPending_, 1);
static const AuUInt32 kExpect { 0 };
RWLOCK_REORDER_BARRIER();
if ((iCurState = this->state_) == 0)
{
bStatus = true;
bStatusTwo = true;
}
else
{
bStatus = WaitOnAddress(pSemaphore, &kExpect, sizeof(kExpect), qwTimeoutNS);
}
AuAtomicSub(&this->writersPending_, 1);
return false;
if (!bStatus)
{
break;
}
if (!bStatusTwo)
{
while (true)
{
auto uState = *pSemaphore;
if (uState == 0)
{
break;
}
if (AuAtomicCompareExchange(pSemaphore, uState - 1, uState) == uState)
{
break;
}
}
}
}
}
else
{
if (qwTimeoutNS)
{
uSecondTimeout = qwTimeoutNS - AuTime::SteadyClockNS();
#if defined(AURWLOCK_NO_SIZE_OPTIMIZED_CONDVAR)
bool bStatus = this->GetConditionWriter().WaitForSignalNS(uSecondTimeout);
#else
bool bStatus = this->GetConditionWriter().WaitForSignalNsEx(&this->mutex_, uSecondTimeout);
#endif
if (uSecondTimeout <= 0)
{
return false;
}
}
AuAtomicAdd(&this->writersPending_, 1);
#if defined(AURWLOCK_NO_SIZE_OPTIMIZED_CONDVAR)
bStatus = this->GetConditionWriter().WaitForSignalNS(uSecondTimeout);
#else
bStatus = this->GetConditionWriter().WaitForSignalNsEx(&this->mutex_, uSecondTimeout);
#endif
AuAtomicSub(&this->writersPending_, 1);
}
if constexpr (bIsWriteRecursionAllowed)
{
if (this->state_ == 1)
{
this->GetConditionWriter().Broadcast();
this->SignalManyWriter();
}
}
if (!bStatus)
{
AuAtomicSub(&this->writersPending_, 1);
return false;
}
}
@ -402,7 +488,6 @@ namespace Aurora::Threading::Primitives
if (AuAtomicCompareExchange(&this->state_, -1, 0) == 0)
{
this->reentrantWriteLockHandle_ = GetThreadCookie();
AuAtomicSub(&this->writersPending_, 1);
return true;
}
}
@ -410,6 +495,63 @@ namespace Aurora::Threading::Primitives
return true;
}
template<bool bIsWriteRecursionAllowed>
void RWLockImpl<bIsWriteRecursionAllowed>::SignalOneReader()
{
if (gUseFutexRWLock)
{
WakeOnAddress((const void *)&this->state_);
}
else
{
this->GetCondition().Signal();
}
}
template<bool bIsWriteRecursionAllowed>
void RWLockImpl<bIsWriteRecursionAllowed>::SignalOneWriter()
{
if (gUseFutexRWLock)
{
auto pThat = this->GetFutexConditionWriter();
AuAtomicAdd(pThat, 1u);
WakeOnAddress(pThat);
}
else
{
this->GetConditionWriter().Signal();
}
}
template<bool bIsWriteRecursionAllowed>
void RWLockImpl<bIsWriteRecursionAllowed>::SignalManyReader()
{
if (gUseFutexRWLock)
{
WakeAllOnAddress((const void *)&this->state_);
}
else
{
this->GetCondition().Broadcast();
}
}
template<bool bIsWriteRecursionAllowed>
void RWLockImpl<bIsWriteRecursionAllowed>::SignalManyWriter()
{
if (gUseFutexRWLock)
{
auto pThat = this->GetFutexConditionWriter();
AuUInt32 uCount = this->writersPending_;
AuAtomicAdd(pThat, uCount);
WakeNOnAddress(pThat, uCount);
}
else
{
this->GetConditionWriter().Broadcast();
}
}
template<bool bIsWriteRecursionAllowed>
bool RWLockImpl<bIsWriteRecursionAllowed>::TryLockRead()
{
@ -506,18 +648,24 @@ namespace Aurora::Threading::Primitives
{
bool bElevation {};
if (!gUseFutexRWLock)
{
AU_LOCK_GUARD(this->mutex_);
AU_LOCK_GUARD(this->mutex_); /* actually locking this->state_, out of branch. required for the mutually exclusive correctness of the condition. this is a fence. */
bElevation = this->writersPending_ > 0;
}
else
{
/* atomic read */
bElevation = this->writersPending_ > 0;
}
if (bElevation)
{
this->GetConditionWriter().Signal();
this->SignalOneWriter();
}
else
{
this->GetCondition().Broadcast();
this->SignalManyReader();
}
}
}
@ -531,19 +679,33 @@ namespace Aurora::Threading::Primitives
{
this->reentrantWriteLockHandle_ = 0;
if (!gUseFutexRWLock)
{
AU_LOCK_GUARD(this->mutex_);
#if defined(AURORA_COMPILER_MSVC)
this->state_ = 0;
#else
__sync_lock_release(&this->state_);
#endif
bElevationPending = this->writersPending_ > 0;
}
else
{
bElevationPending = this->writersPending_ > 0;
#if defined(AURORA_COMPILER_MSVC)
this->state_ = 0;
#else
__sync_lock_release(&this->state_);
#endif
}
if (bElevationPending)
{
this->GetConditionWriter().Signal();
this->SignalOneWriter();
}
else
{
this->GetCondition().Broadcast();
this->SignalManyReader();
}
}
else
@ -568,18 +730,23 @@ namespace Aurora::Threading::Primitives
if (val == 0)
{
if (!gUseFutexRWLock)
{
AU_LOCK_GUARD(this->mutex_);
bElevationPending = this->writersPending_ > 0;
}
else
{
bElevationPending = this->writersPending_ > 0;
}
if (bElevationPending)
{
this->GetConditionWriter().Signal();
this->SignalOneWriter();
}
else
{
this->GetCondition().Broadcast();
this->SignalManyReader();
}
}
}
@ -590,65 +757,158 @@ namespace Aurora::Threading::Primitives
{
if (this->state_ == 1)
{
AU_LOCK_GUARD(this->mutex_);
if (this->state_ == 1)
if (gUseFutexRWLock)
{
this->reentrantWriteLockHandle_ = GetThreadCookie();
this->state_ = -1;
return true;
if (this->UpgradeReadToWriteDoUpgrade())
{
return true;
}
}
else
{
AU_LOCK_GUARD(this->mutex_);
if (this->UpgradeReadToWriteDoUpgrade())
{
return true;
}
}
}
auto uEndTime = uTimeout ? AuTime::SteadyClockNS() + uTimeout : 0;
AU_LOCK_GUARD(this->mutex_);
AuAtomicAdd(&this->writersPending_, 1);
while (this->state_ != 1)
if (!gUseFutexRWLock)
{
AuInt64 iSecondTimeout {};
AU_LOCK_GUARD(this->mutex_);
AuAtomicAdd(&this->writersPending_, 1);
if (uTimeout)
while (this->state_ != 1)
{
iSecondTimeout = AuInt64(uEndTime) - AuTime::SteadyClockNS();
AuInt64 iSecondTimeout {};
if (iSecondTimeout <= 0)
if (uTimeout)
{
iSecondTimeout = AuInt64(uEndTime) - AuTime::SteadyClockNS();
if (iSecondTimeout <= 0)
{
AuAtomicSub(&this->writersPending_, 1);
return false;
}
}
#if defined(AURWLOCK_NO_SIZE_OPTIMIZED_CONDVAR)
if (!this->GetConditionWriter().WaitForSignalNS(iSecondTimeout))
#else
if (!this->GetConditionWriter().WaitForSignalNsEx(&this->mutex_, iSecondTimeout))
#endif
{
AuAtomicSub(&this->writersPending_, 1);
return false;
}
}
#if defined(AURWLOCK_NO_SIZE_OPTIMIZED_CONDVAR)
if (!this->GetConditionWriter().WaitForSignalNS(iSecondTimeout))
#else
if (!this->GetConditionWriter().WaitForSignalNsEx(&this->mutex_, iSecondTimeout))
#endif
AuAtomicSub(&this->writersPending_, 1);
return this->UpgradeReadToWriteDoUpgrade();
}
else
{
while (true)
{
return false;
auto pSemaphore = this->GetFutexConditionWriter();
AuInt32 iCurState;
while ((iCurState = this->state_) != 1)
{
bool bStatusTwo {};
bool bStatus {};
AuAtomicAdd(&this->writersPending_, 1);
static const AuUInt32 kExpect { 0 };
RWLOCK_REORDER_BARRIER();
if ((iCurState = this->state_) == 1)
{
bStatus = true;
bStatusTwo = true;
}
else
{
bStatus = WaitOnAddress(pSemaphore, &kExpect, sizeof(kExpect), uEndTime);
}
AuAtomicSub(&this->writersPending_, 1);
if (!bStatus)
{
return false;
}
if (!bStatusTwo)
{
while (true)
{
auto uState = *pSemaphore;
if (uState == 0)
{
break;
}
if (AuAtomicCompareExchange(pSemaphore, uState - 1, uState) == uState)
{
break;
}
}
}
}
if (this->UpgradeReadToWriteDoUpgrade())
{
return true;
}
}
}
AuAtomicSub(&this->writersPending_, 1);
this->reentrantWriteLockHandle_ = GetThreadCookie();
this->state_ = -1;
return true;
/* unreachable */
return false;
}
template<bool bIsWriteRecursionAllowed>
bool RWLockImpl<bIsWriteRecursionAllowed>::UpgradeReadToWriteDoUpgrade()
{
if (AuAtomicCompareExchange(&this->state_, -1, 1) == 1)
{
this->reentrantWriteLockHandle_ = GetThreadCookie();
return true;
}
else
{
return false;
}
}
template<bool bIsWriteRecursionAllowed>
bool RWLockImpl<bIsWriteRecursionAllowed>::DowngradeWriteToRead()
{
if (gUseFutexRWLock)
{
if (AuAtomicCompareExchange(&this->state_, 1, -1) == -1)
{
this->SignalManyReader();
return true;
}
}
AU_LOCK_GUARD(this->mutex_);
if (this->state_ != -1)
if (AuAtomicCompareExchange(&this->state_, 1, -1) == -1)
{
this->SignalManyReader();
return true;
}
else
{
return false;
}
this->state_ = 1;
this->GetCondition().Broadcast();
return true;
}
template<bool bIsWriteRecursionAllowed>

View File

@ -74,8 +74,13 @@ namespace Aurora::Threading::Primitives
auline void UnlockRead();// override;
auline void UnlockWrite();// override;
auline bool LockWriteNSAbsSecondPath();// override;
auline bool LockWriteNSAbsUnlocked(AuUInt64 qwTimeoutNS);// override;
bool UpgradeReadToWrite(AuUInt64 timeout) override;
bool DowngradeWriteToRead() override;
auline bool UpgradeReadToWriteDoUpgrade();
IWaitable *AsReadable() override;
IWaitable *AsWritable() override;
@ -83,6 +88,14 @@ namespace Aurora::Threading::Primitives
auline ConditionVariableInternal &GetCondition();
auline ConditionVariableInternal &GetConditionWriter();
auline AuUInt32 *GetFutexCondition();
auline AuUInt32 *GetFutexConditionWriter();
auline void SignalOneReader();
auline void SignalOneWriter();
auline void SignalManyReader();
auline void SignalManyWriter();
private:
RWLockAccessView<true, RWLockImpl> read_;

View File

@ -53,6 +53,8 @@ namespace Aurora::Threading
Primitives::InitAdaptiveThreshold();
}
AUKN_SYM bool IsWaitOnRecommended();
}
namespace Aurora::Threading::Primitives
@ -62,6 +64,8 @@ namespace Aurora::Threading::Primitives
auto uCores = AuHwInfo::GetCPUInfo().uThreads;
gSpinLinearPart = gRuntimeConfig.threadingConfig.uSpinLoopLinearBit;
gUseFutexRWLock = gRuntimeConfig.threadingConfig.bPreferFutexRWLock &&
IsWaitOnRecommended();
if (!gRuntimeConfig.threadingConfig.bForceEnableAdaptiveSpin)
{
@ -69,7 +73,11 @@ namespace Aurora::Threading::Primitives
return;
}
if (uCores >= 16)
if (uCores == 1)
{
gSpinAdaptiveThreshold = 0;
}
else if (uCores >= 16)
{
gSpinAdaptiveThreshold = uCores / gRuntimeConfig.threadingConfig.uAdaptiveSpinCUCnt16;
}

View File

@ -19,6 +19,8 @@ namespace Aurora::Threading::Primitives
inline AuUInt32 gSpinAdaptiveCurrentCount {};
inline AuUInt32 gSpinLinearPart {};
inline AuUInt32 gUseFutexRWLock {};
void InitAdaptiveThreshold();
void InitAdaptiveThresholdFirstTime();