diff --git a/Source/IO/Adapters/AuIOAdapterAsyncStream.cpp b/Source/IO/Adapters/AuIOAdapterAsyncStream.cpp index 421d23cf..671c6634 100644 --- a/Source/IO/Adapters/AuIOAdapterAsyncStream.cpp +++ b/Source/IO/Adapters/AuIOAdapterAsyncStream.cpp @@ -91,6 +91,9 @@ namespace Aurora::IO::Adapters bool bIsStream {}; bool bFlushOnWrite { true }; + AuCriticalSection mutex; + AuSPtr pCompletionGroup; + AuUInt64 readOffset {}; AuUInt64 writeOffset {}; @@ -150,19 +153,25 @@ namespace Aurora::IO::Adapters AuUInt64 AsyncStreamAdapter::SetReadOffset(AuUInt64 offset) { + AU_LOCK_GUARD(this->mutex); + if (this->locked == 1) { this->writer.Preframe(); } + return AuExchange(this->readOffset, offset); } AuUInt64 AsyncStreamAdapter::GetWriteOffset() { + AU_LOCK_GUARD(this->mutex); + if (this->locked == 1) { this->writer.Preframe(); } + return this->writeOffset; } @@ -179,6 +188,7 @@ namespace Aurora::IO::Adapters this->reader.parent = this; this->writer.parent = this; this->bIsStream = bIsStream; + this->pCompletionGroup = this->transaction->GetCompletionGroup(); return true; } @@ -218,6 +228,8 @@ namespace Aurora::IO::Adapters EStreamError AsyncStreamReader::BeginRead(const AuSPtr &internalView) { + AU_LOCK_GUARD(this->parent->mutex); + if (parent->lastAllocation) { auto uLength = parent->transaction->GetLastPacketLength(); @@ -250,6 +262,11 @@ namespace Aurora::IO::Adapters parent->lastAllocation->streamIndex = 0; + if (this->parent->pCompletionGroup) + { + (void)parent->transaction->TryAttachToCompletionGroup(this->parent->pCompletionGroup); + } + if (!parent->transaction->StartRead(parent->bIsStream ? 0 : parent->readOffset, parent->lastAllocation)) { parent->bAsyncActive = false; @@ -262,6 +279,8 @@ namespace Aurora::IO::Adapters EStreamError AsyncStreamReader::Dequeue(AuUInt uReqLength, Memory::MemoryViewWrite &out) { + AU_LOCK_GUARD(this->parent->mutex); + out = {}; if (!parent->transaction->Complete()) @@ -335,6 +354,8 @@ namespace Aurora::IO::Adapters EStreamError AsyncStreamReader::Read(const Memory::MemoryViewStreamWrite ¶meters) { + AU_LOCK_GUARD(this->parent->mutex); + if (!parameters.length) { SysPushErrorArg(); @@ -411,6 +432,11 @@ namespace Aurora::IO::Adapters parent->lastAllocation = parent->AllocateNextPageCached(parameters.length); parent->lastAllocation->streamIndex = 0; + if (this->parent->pCompletionGroup) + { + (void)parent->transaction->TryAttachToCompletionGroup(this->parent->pCompletionGroup); + } + if (!parent->transaction->StartRead(parent->bIsStream ? 0 : parent->readOffset, parent->lastAllocation)) { parent->bAsyncActive = false; @@ -428,7 +454,7 @@ namespace Aurora::IO::Adapters AsyncStreamWriter::~AsyncStreamWriter() { - if (used) + if (this->used) { Flush(); } @@ -443,7 +469,9 @@ namespace Aurora::IO::Adapters EStreamError AsyncStreamWriter::Write(const Memory::MemoryViewStreamRead ¶meters) { - used = true; + AU_LOCK_GUARD(this->parent->mutex); + + this->used = true; if (!parameters.ptr) { @@ -492,6 +520,8 @@ namespace Aurora::IO::Adapters void AsyncStreamWriter::Flush() { + AU_LOCK_GUARD(this->parent->mutex); + Preframe(); Frame(); } @@ -516,6 +546,7 @@ namespace Aurora::IO::Adapters parent->transaction->GetLastPacketLength(); } + parent->bAsyncActive = false; parent->transaction->Reset(); } } @@ -524,6 +555,11 @@ namespace Aurora::IO::Adapters { AuSPtr buffer; + if (parent->bAsyncActive) + { + return; + } + if (this->writesPending.size() == 1) { buffer = AuMove(this->writesPending[0]); @@ -599,6 +635,8 @@ namespace Aurora::IO::Adapters AuSPtr AsyncStreamAdapter::ToStreamReader() { + AU_LOCK_GUARD(this->mutex); + if (this->locked != 0 && this->locked != 2) { return {}; @@ -611,6 +649,8 @@ namespace Aurora::IO::Adapters AuSPtr AsyncStreamAdapter::ToStreamWriter() { + AU_LOCK_GUARD(this->mutex); + if (this->locked != 0 && this->locked != 1) { return {}; @@ -623,6 +663,8 @@ namespace Aurora::IO::Adapters AuSPtr AsyncStreamAdapter::ToWaitable() { + AU_LOCK_GUARD(this->mutex); + if (auto pGroup = this->transaction->GetCompletionGroup()) { this->source2.SetGroup(pGroup); @@ -643,6 +685,8 @@ namespace Aurora::IO::Adapters void AsyncStreamAdapter::ReserveBuffer(AuUInt64 uLength) { + AU_LOCK_GUARD(this->mutex); + if (!this->lastAllocation || !this->bAsyncActive) { this->lastAllocation = this->AllocateNextPageCached(uLength); @@ -651,6 +695,8 @@ namespace Aurora::IO::Adapters bool AsyncStreamAdapter::Reset() { + AU_LOCK_GUARD(this->mutex); + if (this->locked == 1) { if (this->writer.HasWorkItems())