From 664eb270dcd488a7cc31c9b78c571b292d3061f7 Mon Sep 17 00:00:00 2001 From: Reece Date: Tue, 25 Apr 2023 06:36:31 +0100 Subject: [PATCH] [+] IIOPipeWork::GetBytesProcessedInterframe [*] IOPipeProcessors will now cap reported progression to the requested length if the requested page size would mean we overrun it. --- Include/Aurora/IO/IIOPipeWork.hpp | 6 ++++ Source/IO/AuIOPipeProcessor.cpp | 48 ++++++++++++++++++++++++++++--- Source/IO/AuIOPipeProcessor.hpp | 3 ++ 3 files changed, 53 insertions(+), 4 deletions(-) diff --git a/Include/Aurora/IO/IIOPipeWork.hpp b/Include/Aurora/IO/IIOPipeWork.hpp index 792583ea..8cadb871 100644 --- a/Include/Aurora/IO/IIOPipeWork.hpp +++ b/Include/Aurora/IO/IIOPipeWork.hpp @@ -46,5 +46,11 @@ namespace Aurora::IO * @return */ virtual AuUInt64 GetBytesProcessed() = 0; + + /** + * @brief bytes read written to the destination. can be used under any callback to determine interframe progress + * @return + */ + virtual AuUInt64 GetBytesProcessedInterframe() = 0; }; } \ No newline at end of file diff --git a/Source/IO/AuIOPipeProcessor.cpp b/Source/IO/AuIOPipeProcessor.cpp index 82c67438..643cbb7f 100644 --- a/Source/IO/AuIOPipeProcessor.cpp +++ b/Source/IO/AuIOPipeProcessor.cpp @@ -183,6 +183,11 @@ namespace Aurora::IO return this->uBytesWritten_; } + AuUInt64 IOPipeWork::GetBytesProcessedInterframe() + { + return this->uBytesWritten_ + this->bytesProcessedInterframe_; + } + void IOPipeWork::PrepareStream() { if (!this->buffer_.IsEmpty()) @@ -322,12 +327,16 @@ namespace Aurora::IO AuUInt32 IOPipeWork::TryPump() { - AuUInt bytesProcessedTotal {}; + AuUInt &bytesProcessedTotal = this->bytesProcessedInterframe_; AuUInt bytesProcessed {}; + bool bIsCullingLastFrame {}; + + bytesProcessedTotal = 0; do { - AuUInt canRead = this->buffer_.RemainingBytes(); + AuUInt canRead2 = this->buffer_.RemainingBytes(); + AuUInt canRead = canRead2; if (!canRead) { break; @@ -342,6 +351,18 @@ namespace Aurora::IO auto oldReadHeadPtr = this->buffer_.readPtr; auto readHead = oldReadHeadPtr - this->buffer_.base; + auto oldWriteHeadPtr = this->buffer_.writePtr; + auto writeHead = oldWriteHeadPtr - this->buffer_.base; + + auto uInterframeProgress = this->GetBytesProcessedInterframe(); + + if (bIsCullingLastFrame = (canRead2 + uInterframeProgress > this->uBytesWrittenLimit_)) + { + auto uLastFrameBytes = this->uBytesWrittenLimit_ - uInterframeProgress; + auto uAbsDataToRead = AuMin(canRead, uLastFrameBytes); + this->buffer_.writePtr = this->buffer_.readPtr + uAbsDataToRead; + } + if (pProtocolStack) { pProtocolStack->DoTick(); @@ -394,6 +415,17 @@ namespace Aurora::IO bytesProcessedTotal += bytesProcessed; + if (oldWriteHeadPtr != this->buffer_.writePtr) + { + this->buffer_.writePtr = this->buffer_.base + writeHead; + } + + if (bIsCullingLastFrame) + { + this->bShouldReadNext = false; + break; + } + if (this->buffer_.readPtr == this->buffer_.writePtr) { this->buffer_.readPtr = this->buffer_.base; @@ -441,9 +473,12 @@ namespace Aurora::IO } while (AuExchange(bytesProcessed, 0)); - if (this->buffer_.readPtr == this->buffer_.base) + if (!bIsCullingLastFrame) { - this->bShouldReadNext = true; + if (this->buffer_.readPtr == this->buffer_.base) + { + this->bShouldReadNext = true; + } } this->uBytesWritten_ += bytesProcessedTotal; @@ -455,6 +490,11 @@ namespace Aurora::IO this->request.pListener->OnPipePartialEvent(bytesProcessedTotal); } + if (bIsCullingLastFrame) + { + TerminateOnThread(false); + } + return bytesProcessedTotal; } diff --git a/Source/IO/AuIOPipeProcessor.hpp b/Source/IO/AuIOPipeProcessor.hpp index 1b82625d..71beaa0d 100644 --- a/Source/IO/AuIOPipeProcessor.hpp +++ b/Source/IO/AuIOPipeProcessor.hpp @@ -56,6 +56,7 @@ namespace Aurora::IO virtual AuInt64 GetStartTickMS() override; virtual AuUInt64 GetBytesProcessed() override; + virtual AuUInt64 GetBytesProcessedInterframe() override; virtual double GetPredictedThroughput() override; @@ -108,6 +109,8 @@ namespace Aurora::IO AuUInt uBytesWrittenTarget_ {}; AuByteBuffer buffer_; Utility::ThroughputCalculator throughput_; + AuUInt bytesProcessedInterframe_ {}; + };