diff --git a/Include/Aurora/IO/IIOPipeWork.hpp b/Include/Aurora/IO/IIOPipeWork.hpp index 11798bb7..a6c3130f 100644 --- a/Include/Aurora/IO/IIOPipeWork.hpp +++ b/Include/Aurora/IO/IIOPipeWork.hpp @@ -53,6 +53,51 @@ namespace Aurora::IO */ virtual AuUInt64 GetBytesProcessedInterframe() = 0; + /** + * @brief + * @param uLength + * @return + */ + virtual AuUInt SetNextFrameTargetLength(AuUInt uLength) = 0; + + /** + * @brief + * @param uLength + * @return + */ + virtual AuUInt GetNextFrameTargetLength() = 0; + + /** + * @brief + * @param uLength + * @return + */ + virtual bool ReallocateLater(AuUInt uLength) = 0; + + /** + * @brief + * @return + */ + virtual bool IsReallocationPending() = 0; + + /** + * @brief + * @return + */ + virtual bool DidLastReallocFail() = 0; + + /** + * @brief + * @return + */ + virtual AuUInt GetSuccessfulReallocations() = 0; + + /** + * @brief + * @return + */ + virtual AuUInt GetFailedReallocations() = 0; + AURT_ADD_USR_DATA; }; } \ No newline at end of file diff --git a/Source/IO/AuIOPipeProcessor.cpp b/Source/IO/AuIOPipeProcessor.cpp index 028bc1ca..906003f8 100644 --- a/Source/IO/AuIOPipeProcessor.cpp +++ b/Source/IO/AuIOPipeProcessor.cpp @@ -66,6 +66,7 @@ namespace Aurora::IO } else { + this->DoReallocTick(); this->StreamPump(); } } @@ -259,6 +260,8 @@ namespace Aurora::IO this->bWritingAheadLowLatency = false; } + + this->DoReallocTick(); // attempt low-latency read-ahead if (!this->bWritingAheadLowLatency && @@ -279,9 +282,7 @@ namespace Aurora::IO void IOPipeWork::StreamPump() { - AuUInt canBuffer = this->buffer_.RemainingWrite(); - canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr)); - + AuUInt canBuffer = this->GetNextFrameLength2(); AuUInt read {}; try { @@ -306,15 +307,79 @@ namespace Aurora::IO TryPump(); } + bool IOPipeWork::ReallocateLater(AuUInt uLength) + { + if (!uLength) + { + return false; + } + + if (this->uPendingRealloc_) + { + return false; + } + + this->bLastReallocFail = false; + this->uPendingRealloc_ = uLength; + return true; + } + + bool IOPipeWork::IsReallocationPending() + { + return bool(this->uPendingRealloc_); + } + + bool IOPipeWork::DidLastReallocFail() + { + return this->bLastReallocFail; + } + + AuUInt IOPipeWork::GetSuccessfulReallocations() + { + return this->uReallocs_[0]; + } + + AuUInt IOPipeWork::GetFailedReallocations() + { + return this->uReallocs_[1]; + } + + AuUInt IOPipeWork::SetNextFrameTargetLength(AuUInt uLength) + { + return AuExchange(this->uBytesPerFrame_, uLength); + } + + AuUInt IOPipeWork::GetNextFrameTargetLength() + { + return this->uBytesPerFrame_; + } + + AuUInt IOPipeWork::GetNextFrameLength(AuUInt uBytesMax) + { + if (auto uAltMax = this->uBytesPerFrame_) + { + return AuMin(uAltMax, uBytesMax); + } + else + { + return uBytesMax; + } + } + + AuUInt IOPipeWork::GetNextFrameLength2() + { + AuUInt canBuffer = this->buffer_.RemainingWrite(); + canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr)); + canBuffer = this->GetNextFrameLength(canBuffer); + return canBuffer; + } + void IOPipeWork::ReadNextAsync() { try { - AuUInt canBuffer = this->buffer_.RemainingWrite(); - canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr)); - canBuffer = AuMin(AuUInt(this->uFrameCap_), canBuffer); - - this->nextWriteAsync_ = AuMemoryViewWrite(this->buffer_.writePtr, canBuffer); + AuUInt canBuffer = this->GetNextFrameLength2(); + this->nextWriteAsync_ = AuMemoryViewWrite(this->buffer_.writePtr, GetNextFrameLength(canBuffer)); if (this->pAsyncStreamReader_->BeginRead(AuSPtr(this->SharedFromThis(), &this->nextWriteAsync_)) != AuIO::EStreamError::eErrorNone) @@ -562,6 +627,38 @@ namespace Aurora::IO return this->uBytesWrittenLimit_ && (this->uBytesWrittenLimit_ <= this->uBytesWritten_); } + void IOPipeWork::FailRealloc() + { + this->uReallocs_[1]++; + this->bLastReallocFail = true; + } + + void IOPipeWork::DoReallocTick() + { + auto uNextSize = AuExchange(this->uPendingRealloc_, 0); + + if (!uNextSize) + { + return; + } + + AuByteBuffer replacement(uNextSize, false, false); + if (!replacement) + { + this->FailRealloc(); + return; + } + + if (!replacement.WriteFrom(this->buffer_)) + { + this->FailRealloc(); + return; + } + + this->buffer_ = AuMove(replacement); + this->uReallocs_[0]++; + } + AuByteBuffer *IOPipeWork::GetBuffer() { return &this->buffer_; diff --git a/Source/IO/AuIOPipeProcessor.hpp b/Source/IO/AuIOPipeProcessor.hpp index f7f54db9..f9ae71a1 100644 --- a/Source/IO/AuIOPipeProcessor.hpp +++ b/Source/IO/AuIOPipeProcessor.hpp @@ -35,7 +35,10 @@ namespace Aurora::IO IOPipeWork *parent {}; }; - struct IOPipeWork : IIOPipeWork, IIOEventListenerFunctional, AuEnableSharedFromThis + struct IOPipeWork : + IIOPipeWork, + IIOEventListenerFunctional, + AuEnableSharedFromThis { IOPipeWork(const AuSPtr &pParent, const IOPipeRequestAIO &request); IOPipeWork(const AuSPtr &pParent, const IOPipeRequestBasic &request); @@ -60,8 +63,22 @@ namespace Aurora::IO virtual double GetPredictedThroughput() override; - void RunOnThread(); - void TerminateOnThread(bool bError = false); + virtual AuUInt SetNextFrameTargetLength(AuUInt uLength) override; + + virtual AuUInt GetNextFrameTargetLength() override; + + virtual bool ReallocateLater(AuUInt uLength) override; + + virtual bool IsReallocationPending() override; + + virtual bool DidLastReallocFail() override; + + virtual AuUInt GetSuccessfulReallocations() override; + + virtual AuUInt GetFailedReallocations() override; + + virtual void RunOnThread(); + virtual void TerminateOnThread(bool bError = false); // INIT void PrepareStream(); @@ -77,6 +94,12 @@ namespace Aurora::IO AuUInt32 TryPump(); + void DoReallocTick(); + void FailRealloc(); + + AuUInt GetNextFrameLength(AuUInt uBytesMax); + AuUInt GetNextFrameLength2(); + AuMemoryViewWrite nextWriteAsync_; IOPipeRequest request {}; bool bShouldReadNext {false}; @@ -107,12 +130,15 @@ namespace Aurora::IO AuUInt uBytesWritten_ {}; AuUInt uBytesWrittenLimit_ {}; AuUInt uBytesWrittenTarget_ {}; + AuUInt uBytesPerFrame_ {}; AuByteBuffer buffer_; Aurora::Utility::ThroughputCalculator throughput_; AuUInt bytesProcessedInterframe_ {}; bool bWritingAheadLowLatency {}; bool bWritingAheadIOUOneTerminate {}; - + AuUInt uPendingRealloc_ {}; + AuUInt uReallocs_[2] { 0, 0 }; + bool bLastReallocFail { true }; }; diff --git a/Source/IO/Protocol/AuProtocolStack.cpp b/Source/IO/Protocol/AuProtocolStack.cpp index 023e09b6..be7dced8 100644 --- a/Source/IO/Protocol/AuProtocolStack.cpp +++ b/Source/IO/Protocol/AuProtocolStack.cpp @@ -695,6 +695,12 @@ namespace Aurora::IO::Protocol ((pCurrent->outputBuffer.length > target) && (bytesRem < pCurrent->uStartingSize))) { AuByteBuffer replacement(target, true, false); + if (!replacement) + { + this->Terminate(); + return false; + } + if (!replacement.WriteFrom(pCurrent->outputBuffer)) { this->Terminate();