[*] Harden AuIOAdapterAsyncStream

This commit is contained in:
Reece Wilson 2024-04-21 13:04:06 +01:00
parent c26eaf86a7
commit 579185ec10

View File

@ -91,6 +91,9 @@ namespace Aurora::IO::Adapters
bool bIsStream {}; bool bIsStream {};
bool bFlushOnWrite { true }; bool bFlushOnWrite { true };
AuCriticalSection mutex;
AuSPtr<CompletionGroup::ICompletionGroup> pCompletionGroup;
AuUInt64 readOffset {}; AuUInt64 readOffset {};
AuUInt64 writeOffset {}; AuUInt64 writeOffset {};
@ -150,19 +153,25 @@ namespace Aurora::IO::Adapters
AuUInt64 AsyncStreamAdapter::SetReadOffset(AuUInt64 offset) AuUInt64 AsyncStreamAdapter::SetReadOffset(AuUInt64 offset)
{ {
AU_LOCK_GUARD(this->mutex);
if (this->locked == 1) if (this->locked == 1)
{ {
this->writer.Preframe(); this->writer.Preframe();
} }
return AuExchange(this->readOffset, offset); return AuExchange(this->readOffset, offset);
} }
AuUInt64 AsyncStreamAdapter::GetWriteOffset() AuUInt64 AsyncStreamAdapter::GetWriteOffset()
{ {
AU_LOCK_GUARD(this->mutex);
if (this->locked == 1) if (this->locked == 1)
{ {
this->writer.Preframe(); this->writer.Preframe();
} }
return this->writeOffset; return this->writeOffset;
} }
@ -179,6 +188,7 @@ namespace Aurora::IO::Adapters
this->reader.parent = this; this->reader.parent = this;
this->writer.parent = this; this->writer.parent = this;
this->bIsStream = bIsStream; this->bIsStream = bIsStream;
this->pCompletionGroup = this->transaction->GetCompletionGroup();
return true; return true;
} }
@ -218,6 +228,8 @@ namespace Aurora::IO::Adapters
EStreamError AsyncStreamReader::BeginRead(const AuSPtr<Memory::MemoryViewWrite> &internalView) EStreamError AsyncStreamReader::BeginRead(const AuSPtr<Memory::MemoryViewWrite> &internalView)
{ {
AU_LOCK_GUARD(this->parent->mutex);
if (parent->lastAllocation) if (parent->lastAllocation)
{ {
auto uLength = parent->transaction->GetLastPacketLength(); auto uLength = parent->transaction->GetLastPacketLength();
@ -250,6 +262,11 @@ namespace Aurora::IO::Adapters
parent->lastAllocation->streamIndex = 0; parent->lastAllocation->streamIndex = 0;
if (this->parent->pCompletionGroup)
{
(void)parent->transaction->TryAttachToCompletionGroup(this->parent->pCompletionGroup);
}
if (!parent->transaction->StartRead(parent->bIsStream ? 0 : parent->readOffset, parent->lastAllocation)) if (!parent->transaction->StartRead(parent->bIsStream ? 0 : parent->readOffset, parent->lastAllocation))
{ {
parent->bAsyncActive = false; parent->bAsyncActive = false;
@ -262,6 +279,8 @@ namespace Aurora::IO::Adapters
EStreamError AsyncStreamReader::Dequeue(AuUInt uReqLength, Memory::MemoryViewWrite &out) EStreamError AsyncStreamReader::Dequeue(AuUInt uReqLength, Memory::MemoryViewWrite &out)
{ {
AU_LOCK_GUARD(this->parent->mutex);
out = {}; out = {};
if (!parent->transaction->Complete()) if (!parent->transaction->Complete())
@ -335,6 +354,8 @@ namespace Aurora::IO::Adapters
EStreamError AsyncStreamReader::Read(const Memory::MemoryViewStreamWrite &parameters) EStreamError AsyncStreamReader::Read(const Memory::MemoryViewStreamWrite &parameters)
{ {
AU_LOCK_GUARD(this->parent->mutex);
if (!parameters.length) if (!parameters.length)
{ {
SysPushErrorArg(); SysPushErrorArg();
@ -411,6 +432,11 @@ namespace Aurora::IO::Adapters
parent->lastAllocation = parent->AllocateNextPageCached(parameters.length); parent->lastAllocation = parent->AllocateNextPageCached(parameters.length);
parent->lastAllocation->streamIndex = 0; parent->lastAllocation->streamIndex = 0;
if (this->parent->pCompletionGroup)
{
(void)parent->transaction->TryAttachToCompletionGroup(this->parent->pCompletionGroup);
}
if (!parent->transaction->StartRead(parent->bIsStream ? 0 : parent->readOffset, parent->lastAllocation)) if (!parent->transaction->StartRead(parent->bIsStream ? 0 : parent->readOffset, parent->lastAllocation))
{ {
parent->bAsyncActive = false; parent->bAsyncActive = false;
@ -428,7 +454,7 @@ namespace Aurora::IO::Adapters
AsyncStreamWriter::~AsyncStreamWriter() AsyncStreamWriter::~AsyncStreamWriter()
{ {
if (used) if (this->used)
{ {
Flush(); Flush();
} }
@ -443,7 +469,9 @@ namespace Aurora::IO::Adapters
EStreamError AsyncStreamWriter::Write(const Memory::MemoryViewStreamRead &parameters) EStreamError AsyncStreamWriter::Write(const Memory::MemoryViewStreamRead &parameters)
{ {
used = true; AU_LOCK_GUARD(this->parent->mutex);
this->used = true;
if (!parameters.ptr) if (!parameters.ptr)
{ {
@ -492,6 +520,8 @@ namespace Aurora::IO::Adapters
void AsyncStreamWriter::Flush() void AsyncStreamWriter::Flush()
{ {
AU_LOCK_GUARD(this->parent->mutex);
Preframe(); Preframe();
Frame(); Frame();
} }
@ -516,6 +546,7 @@ namespace Aurora::IO::Adapters
parent->transaction->GetLastPacketLength(); parent->transaction->GetLastPacketLength();
} }
parent->bAsyncActive = false;
parent->transaction->Reset(); parent->transaction->Reset();
} }
} }
@ -524,6 +555,11 @@ namespace Aurora::IO::Adapters
{ {
AuSPtr<AsyncStreamMemory> buffer; AuSPtr<AsyncStreamMemory> buffer;
if (parent->bAsyncActive)
{
return;
}
if (this->writesPending.size() == 1) if (this->writesPending.size() == 1)
{ {
buffer = AuMove(this->writesPending[0]); buffer = AuMove(this->writesPending[0]);
@ -599,6 +635,8 @@ namespace Aurora::IO::Adapters
AuSPtr<IAsyncStreamReader> AsyncStreamAdapter::ToStreamReader() AuSPtr<IAsyncStreamReader> AsyncStreamAdapter::ToStreamReader()
{ {
AU_LOCK_GUARD(this->mutex);
if (this->locked != 0 && this->locked != 2) if (this->locked != 0 && this->locked != 2)
{ {
return {}; return {};
@ -611,6 +649,8 @@ namespace Aurora::IO::Adapters
AuSPtr<IStreamWriter> AsyncStreamAdapter::ToStreamWriter() AuSPtr<IStreamWriter> AsyncStreamAdapter::ToStreamWriter()
{ {
AU_LOCK_GUARD(this->mutex);
if (this->locked != 0 && this->locked != 1) if (this->locked != 0 && this->locked != 1)
{ {
return {}; return {};
@ -623,6 +663,8 @@ namespace Aurora::IO::Adapters
AuSPtr<IIOWaitableItem> AsyncStreamAdapter::ToWaitable() AuSPtr<IIOWaitableItem> AsyncStreamAdapter::ToWaitable()
{ {
AU_LOCK_GUARD(this->mutex);
if (auto pGroup = this->transaction->GetCompletionGroup()) if (auto pGroup = this->transaction->GetCompletionGroup())
{ {
this->source2.SetGroup(pGroup); this->source2.SetGroup(pGroup);
@ -643,6 +685,8 @@ namespace Aurora::IO::Adapters
void AsyncStreamAdapter::ReserveBuffer(AuUInt64 uLength) void AsyncStreamAdapter::ReserveBuffer(AuUInt64 uLength)
{ {
AU_LOCK_GUARD(this->mutex);
if (!this->lastAllocation || !this->bAsyncActive) if (!this->lastAllocation || !this->bAsyncActive)
{ {
this->lastAllocation = this->AllocateNextPageCached(uLength); this->lastAllocation = this->AllocateNextPageCached(uLength);
@ -651,6 +695,8 @@ namespace Aurora::IO::Adapters
bool AsyncStreamAdapter::Reset() bool AsyncStreamAdapter::Reset()
{ {
AU_LOCK_GUARD(this->mutex);
if (this->locked == 1) if (this->locked == 1)
{ {
if (this->writer.HasWorkItems()) if (this->writer.HasWorkItems())