[+] IIOPipeWork::SetNextFrameTargetLength

[+] IIOPipeWork::GetNextFrameTargetLength
[+] IIOPipeWork::GetNextFrameTargetLength
[+] IIOPipeWork::ReallocateLater
[+] IIOPipeWork::GetSuccessfulReallocations
[+] IIOPipeWork::GetFailedReallocations
[+] IIOPipeWork::DidLastReallocFail
(will save net and engine from doing its their current hacks)
This commit is contained in:
Reece Wilson 2023-10-16 01:37:59 +01:00
parent 6a3ff1c62f
commit cb60e77c63
4 changed files with 186 additions and 12 deletions

View File

@ -53,6 +53,51 @@ namespace Aurora::IO
*/
virtual AuUInt64 GetBytesProcessedInterframe() = 0;
/**
* @brief
* @param uLength
* @return
*/
virtual AuUInt SetNextFrameTargetLength(AuUInt uLength) = 0;
/**
* @brief
* @param uLength
* @return
*/
virtual AuUInt GetNextFrameTargetLength() = 0;
/**
* @brief
* @param uLength
* @return
*/
virtual bool ReallocateLater(AuUInt uLength) = 0;
/**
* @brief
* @return
*/
virtual bool IsReallocationPending() = 0;
/**
* @brief
* @return
*/
virtual bool DidLastReallocFail() = 0;
/**
* @brief
* @return
*/
virtual AuUInt GetSuccessfulReallocations() = 0;
/**
* @brief
* @return
*/
virtual AuUInt GetFailedReallocations() = 0;
AURT_ADD_USR_DATA;
};
}

View File

@ -66,6 +66,7 @@ namespace Aurora::IO
}
else
{
this->DoReallocTick();
this->StreamPump();
}
}
@ -260,6 +261,8 @@ namespace Aurora::IO
this->bWritingAheadLowLatency = false;
}
this->DoReallocTick();
// attempt low-latency read-ahead
if (!this->bWritingAheadLowLatency &&
this->buffer_.CanWrite(this->uFrameCap_) /*ensure we can run ahead*/ &&
@ -279,9 +282,7 @@ namespace Aurora::IO
void IOPipeWork::StreamPump()
{
AuUInt canBuffer = this->buffer_.RemainingWrite();
canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr));
AuUInt canBuffer = this->GetNextFrameLength2();
AuUInt read {};
try
{
@ -306,15 +307,79 @@ namespace Aurora::IO
TryPump();
}
bool IOPipeWork::ReallocateLater(AuUInt uLength)
{
if (!uLength)
{
return false;
}
if (this->uPendingRealloc_)
{
return false;
}
this->bLastReallocFail = false;
this->uPendingRealloc_ = uLength;
return true;
}
bool IOPipeWork::IsReallocationPending()
{
return bool(this->uPendingRealloc_);
}
bool IOPipeWork::DidLastReallocFail()
{
return this->bLastReallocFail;
}
AuUInt IOPipeWork::GetSuccessfulReallocations()
{
return this->uReallocs_[0];
}
AuUInt IOPipeWork::GetFailedReallocations()
{
return this->uReallocs_[1];
}
AuUInt IOPipeWork::SetNextFrameTargetLength(AuUInt uLength)
{
return AuExchange(this->uBytesPerFrame_, uLength);
}
AuUInt IOPipeWork::GetNextFrameTargetLength()
{
return this->uBytesPerFrame_;
}
AuUInt IOPipeWork::GetNextFrameLength(AuUInt uBytesMax)
{
if (auto uAltMax = this->uBytesPerFrame_)
{
return AuMin(uAltMax, uBytesMax);
}
else
{
return uBytesMax;
}
}
AuUInt IOPipeWork::GetNextFrameLength2()
{
AuUInt canBuffer = this->buffer_.RemainingWrite();
canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr));
canBuffer = this->GetNextFrameLength(canBuffer);
return canBuffer;
}
void IOPipeWork::ReadNextAsync()
{
try
{
AuUInt canBuffer = this->buffer_.RemainingWrite();
canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr));
canBuffer = AuMin(AuUInt(this->uFrameCap_), canBuffer);
this->nextWriteAsync_ = AuMemoryViewWrite(this->buffer_.writePtr, canBuffer);
AuUInt canBuffer = this->GetNextFrameLength2();
this->nextWriteAsync_ = AuMemoryViewWrite(this->buffer_.writePtr, GetNextFrameLength(canBuffer));
if (this->pAsyncStreamReader_->BeginRead(AuSPtr<AuMemoryViewWrite>(this->SharedFromThis(), &this->nextWriteAsync_)) !=
AuIO::EStreamError::eErrorNone)
@ -562,6 +627,38 @@ namespace Aurora::IO
return this->uBytesWrittenLimit_ && (this->uBytesWrittenLimit_ <= this->uBytesWritten_);
}
void IOPipeWork::FailRealloc()
{
this->uReallocs_[1]++;
this->bLastReallocFail = true;
}
void IOPipeWork::DoReallocTick()
{
auto uNextSize = AuExchange(this->uPendingRealloc_, 0);
if (!uNextSize)
{
return;
}
AuByteBuffer replacement(uNextSize, false, false);
if (!replacement)
{
this->FailRealloc();
return;
}
if (!replacement.WriteFrom(this->buffer_))
{
this->FailRealloc();
return;
}
this->buffer_ = AuMove(replacement);
this->uReallocs_[0]++;
}
AuByteBuffer *IOPipeWork::GetBuffer()
{
return &this->buffer_;

View File

@ -35,7 +35,10 @@ namespace Aurora::IO
IOPipeWork *parent {};
};
struct IOPipeWork : IIOPipeWork, IIOEventListenerFunctional, AuEnableSharedFromThis<IIOPipeWork>
struct IOPipeWork :
IIOPipeWork,
IIOEventListenerFunctional,
AuEnableSharedFromThis<IIOPipeWork>
{
IOPipeWork(const AuSPtr<IOPipeProcessor> &pParent, const IOPipeRequestAIO &request);
IOPipeWork(const AuSPtr<IOPipeProcessor> &pParent, const IOPipeRequestBasic &request);
@ -60,8 +63,22 @@ namespace Aurora::IO
virtual double GetPredictedThroughput() override;
void RunOnThread();
void TerminateOnThread(bool bError = false);
virtual AuUInt SetNextFrameTargetLength(AuUInt uLength) override;
virtual AuUInt GetNextFrameTargetLength() override;
virtual bool ReallocateLater(AuUInt uLength) override;
virtual bool IsReallocationPending() override;
virtual bool DidLastReallocFail() override;
virtual AuUInt GetSuccessfulReallocations() override;
virtual AuUInt GetFailedReallocations() override;
virtual void RunOnThread();
virtual void TerminateOnThread(bool bError = false);
// INIT
void PrepareStream();
@ -77,6 +94,12 @@ namespace Aurora::IO
AuUInt32 TryPump();
void DoReallocTick();
void FailRealloc();
AuUInt GetNextFrameLength(AuUInt uBytesMax);
AuUInt GetNextFrameLength2();
AuMemoryViewWrite nextWriteAsync_;
IOPipeRequest request {};
bool bShouldReadNext {false};
@ -107,12 +130,15 @@ namespace Aurora::IO
AuUInt uBytesWritten_ {};
AuUInt uBytesWrittenLimit_ {};
AuUInt uBytesWrittenTarget_ {};
AuUInt uBytesPerFrame_ {};
AuByteBuffer buffer_;
Aurora::Utility::ThroughputCalculator throughput_;
AuUInt bytesProcessedInterframe_ {};
bool bWritingAheadLowLatency {};
bool bWritingAheadIOUOneTerminate {};
AuUInt uPendingRealloc_ {};
AuUInt uReallocs_[2] { 0, 0 };
bool bLastReallocFail { true };
};

View File

@ -695,6 +695,12 @@ namespace Aurora::IO::Protocol
((pCurrent->outputBuffer.length > target) && (bytesRem < pCurrent->uStartingSize)))
{
AuByteBuffer replacement(target, true, false);
if (!replacement)
{
this->Terminate();
return false;
}
if (!replacement.WriteFrom(pCurrent->outputBuffer))
{
this->Terminate();