diff --git a/Include/Aurora/IO/Net/INetSrvSockets.hpp b/Include/Aurora/IO/Net/INetSrvSockets.hpp index 22320d81..d5d6daf4 100644 --- a/Include/Aurora/IO/Net/INetSrvSockets.hpp +++ b/Include/Aurora/IO/Net/INetSrvSockets.hpp @@ -36,6 +36,7 @@ namespace Aurora::IO::Net AuSPtr pDriver; AuUInt uMaxConnections {}; AuUInt uMaxAcceptBacklog {}; + bool bMultiThreadTCP {}; }; struct NetSocketBindEx : NetSocketBind diff --git a/Source/Async/ThreadPool.cpp b/Source/Async/ThreadPool.cpp index 0802c4b7..321eba98 100644 --- a/Source/Async/ThreadPool.cpp +++ b/Source/Async/ThreadPool.cpp @@ -312,7 +312,7 @@ namespace Aurora::Async && (asyncLoop->WaitAny(0)) ) { - PollInternal(block); + PollInternal(false); success = true; } else @@ -438,6 +438,13 @@ namespace Aurora::Async break; } + if (state->pendingWorkItems.empty() && ( + (this->GetThreadState()->asyncLoop->GetSourceCount() > 1) || + this->GetThreadState()->asyncLoop->CommitPending())) //(this->ToKernelWorkQueue()->IsSignaledPeek())) + { + return false; + } + } while (state->pendingWorkItems.empty()); if (group->workQueue.empty()) @@ -640,7 +647,7 @@ namespace Aurora::Async worker->shuttingdown = true; } - if (groupId != 0) + if (!group->IsSysThread()) { worker->threadObject->SendExitSignal(); threads.push_back(worker->threadObject); @@ -941,11 +948,18 @@ namespace Aurora::Async } auto threadState = AuMakeShared(); + if (!threadState) + { + SysPushErrorMem(); + return {}; + } + threadState->parent = group; threadState->running = AuThreadPrimitives::EventUnique(true, false, true); threadState->syncSema = AuThreadPrimitives::SemaphoreUnique(0); threadState->id = workerId; - threadState->asyncLoop = AuStaticCast(AuLoop::NewLoopQueue()); + threadState->asyncLoop = AuMakeShared(); + threadState->asyncLoop->pParent = threadState.get(); threadState->rateLimiter.SetNextStep(1'000'000); // 1MS in nanoseconds threadState->runMode = ERunMode::eEfficient; diff --git a/Source/Async/ThreadWorkerQueueShim.cpp b/Source/Async/ThreadWorkerQueueShim.cpp index 9ba7c085..2f5fdbe2 100644 --- a/Source/Async/ThreadWorkerQueueShim.cpp +++ b/Source/Async/ThreadWorkerQueueShim.cpp @@ -27,7 +27,7 @@ namespace Aurora::Async auto ret = LoopQueue::AddCallback(source, subscriber); if (ret) { - this->commitPending_ = true; + Schedule(); } return ret; } @@ -37,7 +37,7 @@ namespace Aurora::Async auto ret = LoopQueue::AddCallbackEx(source, subscriber); if (ret) { - this->commitPending_ = true; + Schedule(); } return ret; } @@ -47,14 +47,29 @@ namespace Aurora::Async auto ret = LoopQueue::AddCallback(subscriber); if (ret) { - this->commitPending_ = true; + Schedule(); } return ret; } - bool AsyncLoop::Commit() + void AsyncLoop::Schedule() { this->commitPending_ = true; + + if (AuThreads::GetThread() != this->pParent->threadObject.get()) + { + this->pParent->parent.lock()->cvVariable->Broadcast(); + } + } + + bool AsyncLoop::CommitPending() + { + return this->commitPending_; + } + + bool AsyncLoop::Commit() + { + Schedule(); return true; } } \ No newline at end of file diff --git a/Source/Async/ThreadWorkerQueueShim.hpp b/Source/Async/ThreadWorkerQueueShim.hpp index 61cda610..cad09f9a 100644 --- a/Source/Async/ThreadWorkerQueueShim.hpp +++ b/Source/Async/ThreadWorkerQueueShim.hpp @@ -14,6 +14,7 @@ namespace Aurora::Async { struct AsyncLoop : AuLoop::LoopQueue { + ThreadState *pParent; void OnFrame(); virtual bool AddCallback (const AuSPtr &source, const AuSPtr &subscriber) override; @@ -21,6 +22,9 @@ namespace Aurora::Async virtual bool AddCallback (const AuSPtr &subscriber) override; virtual bool Commit () override; + void Schedule(); + bool CommitPending(); + private: bool commitPending_ {}; }; diff --git a/Source/IO/IOProcessor.cpp b/Source/IO/IOProcessor.cpp index d41f9af0..2439fbcb 100644 --- a/Source/IO/IOProcessor.cpp +++ b/Source/IO/IOProcessor.cpp @@ -19,12 +19,14 @@ namespace Aurora::IO IOProcessor::IOProcessor(AuUInt threadId, bool bTickOnly, AuAsync::WorkerPId_t worker, - const AuSPtr &pLoopQueue) : + const AuSPtr &pLoopQueue, + bool bIsNoQueue) : mutliplexIOAndTimer(!bTickOnly), pLoopQueue(pLoopQueue), asyncWorker(worker), threadId(threadId), - streamProcessors(this) + streamProcessors(this), + bIsNoQueue(bIsNoQueue) { } @@ -54,6 +56,21 @@ namespace Aurora::IO } this->ToQueue()->SourceAdd(this->items.cvEvent); + if (!this->bIsNoQueue) + { + auto pAS = AuMakeShared([pThat = AuSharedFromThis()](const AuSPtr &pSource) -> bool + { + pThat->ManualTick(); + return false; + }); + + if (!this->ToQueue()->AddCallback(this->items.cvEvent, pAS)) + { + return {}; + } + + } + this->ToQueue()->Commit(); return true; } @@ -461,7 +478,9 @@ namespace Aurora::IO { AU_LOCK_GUARD(this->items.mutex); // < critical section / reentrant | can nest submission this->items.cvEvent->Set(); - return AuTryInsert(this->workUnits, work); + auto bRet = AuTryInsert(this->workUnits, work); + //this->pLoopQueue->Commit(); // required to wake up async threads + return bRet; } bool IOProcessor::AddEventListener(const AuSPtr &eventListener) @@ -659,6 +678,12 @@ namespace Aurora::IO } auto item = AuMakeShared(); + if (!item) + { + SysPushErrorMemory(); + return {}; + } + item->pParent = this; item->pListener = pListener; item->pItem = pItem; @@ -849,10 +874,28 @@ namespace Aurora::IO return this->threadId == AuThreads::GetThreadId(); } + + static AuSPtr NewIOProcessorEx(bool tickOnly, const AuSPtr &queue, bool bIsNoQueue) + { + auto processor = AuMakeShared(0, tickOnly, AuAsync::WorkerPId_t {}, queue, bIsNoQueue); + if (!processor) + { + SysPushErrorMem(); + return {}; + } + + if (!processor->Init()) + { + SysPushErrorNested(); + return {}; + } + + return processor; + } AUKN_SYM AuSPtr NewIOProcessor(bool tickOnly, const AuSPtr &queue) { - auto processor = AuMakeShared(0, tickOnly, AuAsync::WorkerPId_t {}, queue); + auto processor = AuMakeShared(0, tickOnly, AuAsync::WorkerPId_t {}, queue, false); if (!processor) { SysPushErrorMem(); @@ -889,7 +932,7 @@ namespace Aurora::IO return {}; } - auto processor = AuMakeShared(AuUInt(thread.get()), tickOnly, id, queue); + auto processor = AuMakeShared(AuUInt(thread.get()), tickOnly, id, queue, false); if (!processor) { SysPushErrorMem(); @@ -914,6 +957,6 @@ namespace Aurora::IO return {}; } - return NewIOProcessor(tickOnly, loop); + return NewIOProcessorEx(tickOnly, loop, true); } } \ No newline at end of file diff --git a/Source/IO/IOProcessor.hpp b/Source/IO/IOProcessor.hpp index 613f054b..72d14512 100644 --- a/Source/IO/IOProcessor.hpp +++ b/Source/IO/IOProcessor.hpp @@ -16,7 +16,7 @@ namespace Aurora::IO struct IOProcessor : IIOProcessor, AuEnableSharedFromThis, AuAsync::IWorkItemHandler { ~IOProcessor(); - IOProcessor(AuUInt threadId, bool tickOnly, AuAsync::WorkerPId_t worker, const AuSPtr &loop); + IOProcessor(AuUInt threadId, bool tickOnly, AuAsync::WorkerPId_t worker, const AuSPtr &loop, bool bIsNoQueue); bool Init(); @@ -91,6 +91,7 @@ namespace Aurora::IO AuUInt64 refreshRateNs {}; AuUInt64 minFrameDeltaNs {}; + bool bIsNoQueue; void UpdateTimers(); diff --git a/Source/IO/Loop/Loop.cpp b/Source/IO/Loop/Loop.cpp index 1f016718..b9f54d61 100644 --- a/Source/IO/Loop/Loop.cpp +++ b/Source/IO/Loop/Loop.cpp @@ -24,14 +24,15 @@ namespace Aurora::IO::Loop } #endif - AUKN_SYM AuSPtr NewLSFile(const AuSPtr &fileTransaction) + AUKN_SYM AuSPtr NewLSFile(const AuSPtr &pFileTransaction) { - if (!fileTransaction) + if (!pFileTransaction) { SysPushErrorArg(); return {}; } - return fileTransaction->NewLoopSource(); + + return pFileTransaction->NewLoopSource(); } AUKN_SYM AuSPtr NewStdIn() diff --git a/Source/IO/Net/AuNetDatagramServer.NT.cpp b/Source/IO/Net/AuNetDatagramServer.NT.cpp index 91820587..896d81f5 100644 --- a/Source/IO/Net/AuNetDatagramServer.NT.cpp +++ b/Source/IO/Net/AuNetDatagramServer.NT.cpp @@ -21,7 +21,8 @@ namespace Aurora::IO::Net pWorker, {}, {}, - 4096), + 4096, + true), Socket(pInterface, pWorker, {}, -1) { } diff --git a/Source/IO/Net/AuNetDatagramServer.cpp b/Source/IO/Net/AuNetDatagramServer.cpp index 438f7019..adae39f2 100644 --- a/Source/IO/Net/AuNetDatagramServer.cpp +++ b/Source/IO/Net/AuNetDatagramServer.cpp @@ -8,6 +8,7 @@ #include "Networking.hpp" #include "AuNetDatagramServer.hpp" #include "AuNetEndpoint.hpp" +#include "AuNetError.hpp" #if defined(AURORA_IS_MODERNNT_DERIVED) #include "AuNetStream.NT.hpp" @@ -24,7 +25,8 @@ namespace Aurora::IO::Net pWorker, {}, {}, - 4096), + 4096, + true), pDriver_(pDriver) { @@ -47,8 +49,55 @@ namespace Aurora::IO::Net } } - void DatagramServer::SendPacket(const NetEndpoint &destination, const AuSPtr &pPacketData, const AuSPtr> &pCalback) + void DatagramServer::SendPacket(const NetEndpoint &destination, + const AuSPtr &pPacketData, + const AuSPtr> &pCalback2) { + AuSPtr pCallback; + + if (pCalback2) + { + pCallback = AuMakeShared([uSourceLength = pPacketData->length, + pCalback2 = AuSPtr>(pCalback2)](AuUInt64 uOffset, AuUInt32 uLength) + { + if (uSourceLength != uLength) + { + AuNet::NetError er; + er.netError = AuNet::ENetworkError::eSocketBufferOverflow; + pCalback2->OnFailure(&er); + return; + } + + pCalback2->OnSuccess((void *)nullptr); + }); + } + + auto pTransaction = AuMakeShared(this); + if (!pTransaction || (pCalback2 && !pCallback)) + { + SysPushErrorMemory("Dropping datagram"); + + if (pCalback2) + { + AuNet::NetError er; + er.netError = AuNet::ENetworkError::eResourceConstraint; + pCalback2->OnFailure(&er); + } + + return; + } + + pTransaction->bDatagramMode = true; + AuMemcpy(&pTransaction->netEndpoint, &destination, sizeof(NetEndpoint)); + + pTransaction->SetCallback(pCallback); + + if (!pTransaction->StartWrite(0, pPacketData)) + { + NetError error; + NetError_SetCurrent(error); + pCalback2->OnFailure(&error); + } } AuSPtr DatagramServer::GetDatagramDriver() @@ -63,14 +112,14 @@ namespace Aurora::IO::Net DeoptimizeEndpoint(endpoint); endpoint.transportProtocol = ETransportProtocol::eProtocolUDP; - auto pByteByffer = this->socketChannel_.AsReadableByteBuffer(); - auto pEndRead = pByteByffer->writePtr; + auto pByteBuffer = this->socketChannel_.AsReadableByteBuffer(); + auto pEndRead = pByteBuffer->writePtr; if (this->pDriver_) { try { - this->pDriver_->OnPacket(endpoint, pByteByffer); + this->pDriver_->OnPacket(endpoint, pByteBuffer); } catch (...) { @@ -83,7 +132,7 @@ namespace Aurora::IO::Net } } - pByteByffer->readPtr = pEndRead; + pByteBuffer->readPtr = pEndRead; #endif } diff --git a/Source/IO/Net/AuNetSocketServer.NT.cpp b/Source/IO/Net/AuNetSocketServer.NT.cpp index 4013bb77..49a4fb12 100644 --- a/Source/IO/Net/AuNetSocketServer.NT.cpp +++ b/Source/IO/Net/AuNetSocketServer.NT.cpp @@ -16,12 +16,14 @@ namespace Aurora::IO::Net NetWorker *pWorker, const AuSPtr &pDriver, const AuSPtr &pSocketDriverFactory, - AuUInt32 maxConnections) : + AuUInt32 maxConnections, + bool bMultiThreadTCP) : SocketServer(pInterface, pWorker, pDriver, pSocketDriverFactory, - maxConnections), + maxConnections, + bMultiThreadTCP), Socket(pInterface, pWorker, AuSPtr{}, diff --git a/Source/IO/Net/AuNetSocketServer.NT.hpp b/Source/IO/Net/AuNetSocketServer.NT.hpp index f17cf73d..6a579ad9 100644 --- a/Source/IO/Net/AuNetSocketServer.NT.hpp +++ b/Source/IO/Net/AuNetSocketServer.NT.hpp @@ -11,13 +11,14 @@ namespace Aurora::IO::Net { - struct SocketServerImpl : SocketServer + struct SocketServerImpl : SocketServer { SocketServerImpl(struct NetInterface *pInterface, struct NetWorker *pWorker, const AuSPtr &pDriver, const AuSPtr &pSocketDriverFactory, - AuUInt32 maxConnections); + AuUInt32 maxConnections, + bool bMultiThreadTCP); virtual void DoNonblockingReadTick() override; @@ -31,6 +32,5 @@ namespace Aurora::IO::Net protected: SocketServerAcceptReadOperation acceptOperation_; AuSPtr pSocketDriverFactory_; - }; } \ No newline at end of file diff --git a/Source/IO/Net/AuNetSocketServer.cpp b/Source/IO/Net/AuNetSocketServer.cpp index 73472f6f..b805c939 100644 --- a/Source/IO/Net/AuNetSocketServer.cpp +++ b/Source/IO/Net/AuNetSocketServer.cpp @@ -17,14 +17,16 @@ namespace Aurora::IO::Net NetWorker *pWorker, const AuSPtr &pDriver, const AuSPtr &pFactory, - AuUInt32 maxConnections) + AuUInt32 maxConnections, + bool bMultiThreadTCP) : Socket(pInterface, pWorker, AuSPtr{}, -1), pDriver_(pDriver), pFactory_(pFactory), - uMaxConnections_(maxConnections) + uMaxConnections_(maxConnections), + bMultiThreadTCP(bMultiThreadTCP) { } diff --git a/Source/IO/Net/AuNetSocketServer.hpp b/Source/IO/Net/AuNetSocketServer.hpp index 184eacf4..a1b57db9 100644 --- a/Source/IO/Net/AuNetSocketServer.hpp +++ b/Source/IO/Net/AuNetSocketServer.hpp @@ -23,7 +23,8 @@ namespace Aurora::IO::Net struct NetWorker *pWorker, const AuSPtr &pDriver, const AuSPtr &pFactory, - AuUInt32 maxConnections); + AuUInt32 maxConnections, + bool bMultiThreadTCP); void Init(const NetEndpoint &localAddress); void Listen(const NetEndpoint &localAddress, bool bBind = true, bool bListen = true); @@ -46,6 +47,8 @@ namespace Aurora::IO::Net virtual bool BeginAcceptLoop() = 0; virtual void DetroyServer() = 0; + + const bool bMultiThreadTCP; protected: // INTERFACE: base os socket diff --git a/Source/IO/Net/AuNetSocketServerAcceptReadOperation.NT.cpp b/Source/IO/Net/AuNetSocketServerAcceptReadOperation.NT.cpp index db868074..3696fa1f 100644 --- a/Source/IO/Net/AuNetSocketServerAcceptReadOperation.NT.cpp +++ b/Source/IO/Net/AuNetSocketServerAcceptReadOperation.NT.cpp @@ -9,6 +9,9 @@ #include "AuNetSocketServer.hpp" #include "AuNetSocket.hpp" #include "AuNetEndpoint.hpp" +#include "AuNetInterface.hpp" +#include "AuNetWorker.hpp" +#include "AuNetError.hpp" namespace Aurora::IO::Net { @@ -27,24 +30,71 @@ namespace Aurora::IO::Net this->InitOnce(); return SocketServerAcceptReadOperationBase::IsValid() && - bool(lpfnAcceptEx); + bool(lpfnAcceptEx); } void SocketServerAcceptReadOperation::OnOverlappedComplete() { + if (!this->nextSocketPtr) + { + return; + } + SOCKET hListenHandle = (SOCKET)this->pParent_->ToPlatformHandle(); int ret = ::setsockopt(this->nextSocket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&hListenHandle, sizeof(SOCKET)); - int error = WSAGetLastError(); - //AuLogDbg("Error {}", error); - SysAssert(ret != -1); + auto dwError = WSAGetLastError(); + if (ret == -1) + { + SysPushErrorNet("Couldn't enable socket after overlapped accept"); + NetError error; + error.osError = dwError; + this->nextSocketPtr->SendErrorNoStream(error); + return; + } UpdateNextSocketAddresses(); - this->nextSocketPtr->DoMain(); + if (this->pParent_->bMultiThreadTCP) + { + auto pCallback = AuMakeShared([socket = this->nextSocketPtr]() + { + if (!socket->SendPreestablish()) + { + SysPushErrorNet("Couldn't preestablish next socket"); + socket->SendErrorNoStream({}); + return; + } + + socket->DoMain(); + }, + []() + { + + }); + + if (!pCallback) + { + SysPushErrorMemory(); + SysPushErrorNet("Memory"); + this->nextSocketPtr->SendErrorNoStream({}); + return; + } + + if (!this->nextSocketPtr->ToWorker()->ToProcessor()->SubmitIOWorkItem(pCallback)) + { + SysPushErrorMemory(); + SysPushErrorNet("couldnt schedule read tick on thread"); + this->nextSocketPtr->SendErrorNoStream({}); + } + } + else + { + this->nextSocketPtr->DoMain(); + } // accept next this->pParent_->ScheduleAcceptTick(); // We **cannot** readd the current event in the trigger callback @@ -137,6 +187,12 @@ namespace Aurora::IO::Net WSA_FLAG_OVERLAPPED ); + if (nextSocket == INVALID_SOCKET) + { + SysPushErrorNet("No socket"); + return false; + } + auto pFactory = this->pParent_->GetFactory(); if (!bool(pFactory)) { @@ -151,8 +207,18 @@ namespace Aurora::IO::Net return false; } + NetWorker *pWorker; + if (this->pParent_->bMultiThreadTCP) + { + pWorker = this->pInterface_->TryScheduleEx().get(); + } + else + { + pWorker = this->pParent_->ToWorkerEx(); + } + nextSocketPtr = AuMakeShared(this->pInterface_, - this->pParent_->ToWorkerEx(), + pWorker, pNewDriver, (AuUInt)nextSocket); if (!bool(nextSocketPtr)) @@ -162,13 +228,20 @@ namespace Aurora::IO::Net return false; } - if (!nextSocketPtr->SendPreestablish()) + if (this->pParent_->bMultiThreadTCP) { - SysPushErrorNet("Couldn't preestablish next socket"); - return false; + // Defer SendPreestablish until we're done to minimize RPCs + } + else + { + if (!nextSocketPtr->SendPreestablish()) + { + SysPushErrorNet("Couldn't preestablish next socket"); + return false; + } } - return nextSocket != INVALID_SOCKET; + return true; } void SocketServerAcceptReadOperation::UpdateNextSocketAddresses() diff --git a/Source/IO/Net/AuNetSocketServerAcceptReadOperation.Unix.cpp b/Source/IO/Net/AuNetSocketServerAcceptReadOperation.Unix.cpp index 1d490477..ebf4673b 100644 --- a/Source/IO/Net/AuNetSocketServerAcceptReadOperation.Unix.cpp +++ b/Source/IO/Net/AuNetSocketServerAcceptReadOperation.Unix.cpp @@ -80,9 +80,19 @@ namespace Aurora::IO::Net return false; } + NetWorker *pWorker; + if (this->pParent_->bMultiThreadTCP) + { + pWorker = this->pInterface_->TryScheduleEx().get(); + } + else + { + pWorker = this->pParent_->ToWorkerEx(); + } + // Create socket this->nextSocketPtr = AuMakeShared(this->pInterface_, - this->pParent_->ToWorkerEx(), + pWorker, pNewDriver, (AuUInt)this->nextSocket); if (!bool(this->nextSocketPtr)) @@ -92,14 +102,7 @@ namespace Aurora::IO::Net ::close(this->nextSocket); return false; } - - // Preinit - if (!nextSocketPtr->SendPreestablish()) - { - SysPushErrorNet("Couldn't preestablish next socket"); - return false; - } - + // brrrrrrrrrr AuMemcpy(this->nextSocketPtr->remoteEndpoint_.hint, addressRemote, addressLength); this->nextSocketPtr->UpdateLocalEndpoint(); @@ -107,9 +110,53 @@ namespace Aurora::IO::Net // Deoptimize (sockaddr -> binary) endpoints UpdateNextSocketAddresses(); + + if (this->pParent_->bMultiThreadTCP) + { + auto pCallback = AuMakeShared([=]() + { + // Preinit + if (!nextSocketPtr->SendPreestablish()) + { + SysPushErrorNet("Couldn't establish TCP socket on alternative thread"); + nextSocketPtr->CloseSocket(); + return; + } - // Start the channels - this->nextSocketPtr->DoMain(); + // Start the channels + this->nextSocketPtr->DoMain(); + }, + []() + { + + }); + + if (!pCallback) + { + SysPushErrorMemory(); + SysPushErrorNet("Memory"); + return false; + } + + if (!pWorker->ToProcessor()->SubmitIOWorkItem(pCallback)) + { + SysPushErrorMemory(); + SysPushErrorNet("couldnt schedule accept on thread"); + return false; + } + } + else + { + // Preinit + if (!nextSocketPtr->SendPreestablish()) + { + SysPushErrorNet("Couldn't preestablish next socket"); + return false; + } + + // Start the channels + this->nextSocketPtr->DoMain(); + } // And we don't need this reference anymore... this->nextSocketPtr.reset(); diff --git a/Source/IO/Net/AuNetSrvSockets.cpp b/Source/IO/Net/AuNetSrvSockets.cpp index 32119d5c..cb28f681 100644 --- a/Source/IO/Net/AuNetSrvSockets.cpp +++ b/Source/IO/Net/AuNetSrvSockets.cpp @@ -135,7 +135,8 @@ namespace Aurora::IO::Net pWorker.get(), netBind.pDriver, netBind.pFactory, - uMaxSockets); + uMaxSockets, + netBind.bMultiThreadTCP); if (!pSocket) { SysPushErrorNet("No Memory"); @@ -146,7 +147,6 @@ namespace Aurora::IO::Net { pSocket->FinishConstructAsync(); - NetEndpoint endpoint; endpoint.ip = netBind.ip; endpoint.uPort = netBind.uPort; diff --git a/Source/Threading/Primitives/AuRWLock.cpp b/Source/Threading/Primitives/AuRWLock.cpp index 6772dc9b..0c910ef3 100644 --- a/Source/Threading/Primitives/AuRWLock.cpp +++ b/Source/Threading/Primitives/AuRWLock.cpp @@ -135,7 +135,8 @@ namespace Aurora::Threading::Primitives } } } - while (AuAtomicCompareExchange(&this->state_, iCurState + 1, iCurState) != iCurState); + while (iCurState == -1 || + AuAtomicCompareExchange(&this->state_, iCurState + 1, iCurState) != iCurState); #endif return true; @@ -203,7 +204,7 @@ namespace Aurora::Threading::Primitives return; } - auto val = --this->state_; + auto val = AuAtomicSub(&this->state_, 1); if ((val == 1) && (this->bElevaterPending_)) { @@ -221,6 +222,7 @@ namespace Aurora::Threading::Primitives AU_LOCK_GUARD(this->mutex_); this->state_ = 0; this->condition_->Broadcast(); + this->reentrantWriteLockHandle_ = 0; } bool RWLockImpl::UpgradeReadToWrite(AuUInt64 timeout) @@ -252,8 +254,8 @@ namespace Aurora::Threading::Primitives return false; } - this->condition_->Broadcast(); this->state_ = 1; + this->condition_->Broadcast(); return true; }