From e1453529200d2751f04c88c7dedb90184921a26c Mon Sep 17 00:00:00 2001 From: Reece Wilson Date: Tue, 8 Nov 2022 19:24:08 +0000 Subject: [PATCH] [+] IProtocolStack::AppendSingleFrameProcessor [+] IProtocolStack::AppendSingleFrameProcessorEx [+] ICompressionInterceptor::FlushNextFrame [+] ICompressionInterceptor::ConfigureAutoFlushPerFrame --- .../Compression/CompressionInterceptor.hpp | 13 +++ Include/Aurora/IO/Protocol/IProtocolStack.hpp | 26 +++++- Source/Compression/CompressionInterceptor.cpp | 25 ++++++ Source/Compression/CompressionInterceptor.hpp | 6 ++ Source/IO/Protocol/ProtocolPiece.hpp | 1 + Source/IO/Protocol/ProtocolStack.cpp | 80 ++++++++++++------- Source/IO/Protocol/ProtocolStack.hpp | 4 +- 7 files changed, 126 insertions(+), 29 deletions(-) diff --git a/Include/Aurora/Compression/CompressionInterceptor.hpp b/Include/Aurora/Compression/CompressionInterceptor.hpp index b40fd309..fb4ea51d 100644 --- a/Include/Aurora/Compression/CompressionInterceptor.hpp +++ b/Include/Aurora/Compression/CompressionInterceptor.hpp @@ -15,6 +15,19 @@ namespace Aurora::Compression { virtual bool HasFailed() = 0; + /** + * @brief + */ + virtual void FlushNextFrame() = 0; + + /** + * @brief When enabled, each IO tick is considered a logical frame instead of a streaming, meaning that a flush will occur + * To be used in conjuction with IProtocolStack::AppendSingleFrameProcessorEx + * @param bAutoFlush + * @return + */ + virtual bool ConfigureAutoFlushPerFrame(bool bAutoFlush) = 0; + // TODO (Reece): interface potential virtual bool LimitHasHit() = 0; diff --git a/Include/Aurora/IO/Protocol/IProtocolStack.hpp b/Include/Aurora/IO/Protocol/IProtocolStack.hpp index 157db4be..22b673a3 100644 --- a/Include/Aurora/IO/Protocol/IProtocolStack.hpp +++ b/Include/Aurora/IO/Protocol/IProtocolStack.hpp @@ -35,6 +35,29 @@ namespace Aurora::IO::Protocol */ virtual AuSPtr AppendInterceptorEx(const AuSPtr &pInterceptorEx, AuUInt uOutputBufferSize) = 0; + /** + * @brief Same as AppendInterceptor, except that DoTick will repeat until the input is fully consumed. + * This allows us to tick processors under our frame, followed by a retick if there is more data available. + * Such paradigm contrasts the default stream-specific/non-framing behaviour where the processor is + * responsible for consuming all non-segmented data (ala everything except the start of the final unfinished frame). + * With such limitations, it's impossible to build a protocol where a piece in the middle + * 1) can/should only flush 1 frame to the next processor + * 2) reads only a portion of pReadInByteBuffer + * This function sets an internal flag that indicates re-ticks from the root are to be expected to complete + * a tick. Otherwise, the read head of the inbound frame of a persumed stream may only skip back to frame zero + * on error OR frame +1 on success. With this flag, frame current to frame end will be ticked. + * @param pInterceptorEx + * @return + */ + virtual AuSPtr AppendSingleFrameProcessor(const AuSPtr &pInterceptorEx) = 0; + + /** + * @brief See: AppendSingleFrameProcessor + * @param pInterceptorEx + * @return + */ + virtual AuSPtr AppendSingleFrameProcessorEx(const AuSPtr &pInterceptorEx, AuUInt uOutputBufferSize) = 0; + /** * @brief Inserts the interceptor at the bottom of the protocol stack * @param pInterceptorEx @@ -53,7 +76,8 @@ namespace Aurora::IO::Protocol /** * @brief Sends one tick down the protocol stack, regardless of how much data is written into the * next piece/interceptor, and regardless of if another read tick is required. - * Latterly, you are responsible for consuming all available bytes in your interceptor. + * Latterly, you are responsible for consuming all available bytes in your non-framed + * (~AppendSingleFrameProcessor[Ex]) interceptors. */ virtual void DoTick() = 0; diff --git a/Source/Compression/CompressionInterceptor.cpp b/Source/Compression/CompressionInterceptor.cpp index bea58996..2d2fe51f 100644 --- a/Source/Compression/CompressionInterceptor.cpp +++ b/Source/Compression/CompressionInterceptor.cpp @@ -53,6 +53,11 @@ namespace Aurora::Compression { this->pLastBuffer_ = pReadInByteBuffer; this->pBaseStream_->SetBuffer(pWriteOutByteBuffer); + + if (this->LimitHasHit()) + { + return pWriteOutByteBuffer->WriteFromEx(*pReadInByteBuffer, AuNumericLimits::max()); // intentionally non-ex. + } bool bSuccess { true }; do @@ -67,6 +72,15 @@ namespace Aurora::Compression bSuccess = a != 0; } while (bSuccess); + + if ((this->bAutoFlush_) || + (AuExchange(this->bSendFlush_, false))) + { + if (!this->pBaseStream_->Flush()) + { + SysPushErrorIO("Couldn't flush stream. I hope this means no data was available/already flushed quirk occurred bc im still not setting bErrorFlag_ yet. This message might explain session instability. "); + } + } if (this->LimitHasHit()) { @@ -78,10 +92,21 @@ namespace Aurora::Compression this->bErrorFlag_ = true; } + this->pLastBuffer_.reset(); this->pBaseStream_->SetBuffer({}); return true; } + void CompressionInterceptor::FlushNextFrame() + { + this->bSendFlush_ = true; + } + + bool CompressionInterceptor::ConfigureAutoFlushPerFrame(bool bAutoFlush) + { + return AuExchange(this->bAutoFlush_, bAutoFlush); + } + bool CompressionInterceptor::LimitHasHit() { return this->uCount_ >= this->uCountMax_; diff --git a/Source/Compression/CompressionInterceptor.hpp b/Source/Compression/CompressionInterceptor.hpp index 83ec4c3d..7ebc503e 100644 --- a/Source/Compression/CompressionInterceptor.hpp +++ b/Source/Compression/CompressionInterceptor.hpp @@ -27,6 +27,10 @@ namespace Aurora::Compression inline virtual void Close() override; + void FlushNextFrame() override; + + bool ConfigureAutoFlushPerFrame(bool bAutoFlush) override; + bool LimitHasHit() override; void LimitReset() override; void LimitSet(AuUInt uLength) override; @@ -36,6 +40,8 @@ namespace Aurora::Compression private: bool bErrorFlag_ {}; + bool bSendFlush_ {}; + bool bAutoFlush_ {}; AuSPtr pStream_; AuSPtr pBaseStream_; AuSPtr pLastBuffer_; diff --git a/Source/IO/Protocol/ProtocolPiece.hpp b/Source/IO/Protocol/ProtocolPiece.hpp index 28c9a45b..bbca7f68 100644 --- a/Source/IO/Protocol/ProtocolPiece.hpp +++ b/Source/IO/Protocol/ProtocolPiece.hpp @@ -21,6 +21,7 @@ namespace Aurora::IO::Protocol AuSPtr pNext; AuByteBuffer outputBuffer; AuSPtr pOuputWriter; + bool bMultipleTick {}; void Remove() override; AuSPtr ToNextWriter() override; diff --git a/Source/IO/Protocol/ProtocolStack.cpp b/Source/IO/Protocol/ProtocolStack.cpp index 653aa2ba..e15094a3 100644 --- a/Source/IO/Protocol/ProtocolStack.cpp +++ b/Source/IO/Protocol/ProtocolStack.cpp @@ -26,7 +26,18 @@ namespace Aurora::IO::Protocol AuSPtr ProtocolStack::AppendInterceptorEx(const AuSPtr &pInterceptorEx, AuUInt uOutputBufferSize) { - return this->AddInterceptorWhereEx(false, pInterceptorEx, uOutputBufferSize); + return this->AddInterceptorWhereEx(false, pInterceptorEx, uOutputBufferSize, false); + } + + AuSPtr ProtocolStack::AppendSingleFrameProcessor(const AuSPtr &pInterceptorEx) + { + return this->AddInterceptorWhereEx(false, pInterceptorEx, 0, true); + } + + AuSPtr ProtocolStack::AppendSingleFrameProcessorEx(const AuSPtr &pInterceptorEx, + AuUInt uOutputBufferSize) + { + return this->AddInterceptorWhereEx(false, pInterceptorEx, uOutputBufferSize, true); } AuSPtr ProtocolStack::PrependInterceptor(const AuSPtr &pInterceptor, @@ -38,7 +49,7 @@ namespace Aurora::IO::Protocol AuSPtr ProtocolStack::PrependInterceptorEx(const AuSPtr &pInterceptorEx, AuUInt uOutputBufferSize) { - return this->AddInterceptorWhereEx(true, pInterceptorEx, uOutputBufferSize); + return this->AddInterceptorWhereEx(true, pInterceptorEx, uOutputBufferSize, false); } AuSPtr ProtocolStack::AddInterceptorWhere(bool prepend, @@ -150,7 +161,8 @@ namespace Aurora::IO::Protocol AuSPtr ProtocolStack::AddInterceptorWhereEx(bool prepend, const AuSPtr &pInterceptor, - AuUInt uOutputBufferSize) + AuUInt uOutputBufferSize, + bool bMultipleTick) { if (this->bWrittenEnd) { @@ -237,6 +249,7 @@ namespace Aurora::IO::Protocol pNew->pWriteInteface = pWrapper; pNew->pInterceptorEx = pInterceptor; pNew->pParent = this; + pNew->bMultipleTick = bMultipleTick; if (prepend) { @@ -439,37 +452,50 @@ namespace Aurora::IO::Protocol if (pCurrent->pInterceptorEx) { - auto pNextStream = ((pPiece == this->pTopPiece) && (this->pDrainBuffer)) ? - (this->pDrainBuffer) : - (!pCurrent->outputBuffer.IsEmpty() ? AuSPtr(pCurrent, &pCurrent->outputBuffer) : AuSPtr {}); - - auto pOldHead = pNextStream ? pNextStream->readPtr : nullptr; - - if (!pCurrent->pInterceptorEx->OnDataAvailable(pRead, pNextStream)) + bool bTryAgain {}; + bool bTryAgainAtleastOnce {}; + do { - if (pNextStream) + auto pNextStream = ((pPiece == this->pTopPiece) && (this->pDrainBuffer)) ? + (this->pDrainBuffer) : + (!pCurrent->outputBuffer.IsEmpty() ? AuSPtr(pCurrent, &pCurrent->outputBuffer) : AuSPtr {}); + + auto pOldHead = pNextStream ? pNextStream->readPtr : nullptr; + + if (!pCurrent->pInterceptorEx->OnDataAvailable(pRead, pNextStream)) { - pNextStream->readPtr = pOldHead; + if (pNextStream) + { + pNextStream->readPtr = pOldHead; + } + + return bTryAgainAtleastOnce; } - return false; - } - if (!pNextStream) - { - return true; - } - - if (auto pNext = pCurrent->pNext) - { - auto pOldHead = pNextStream->readPtr; - - bool bDontCareAboutSubTicks = this->DoTick(pNextStream, pNext); - - if (!bDontCareAboutSubTicks) + if (!pNextStream) { - pNextStream->readPtr = pOldHead; + return true; + } + + if (auto pNext = pCurrent->pNext) + { + auto pOldHead = pNextStream->readPtr; + + bool bDontCareAboutSubTicks = this->DoTick(pNextStream, pNext); + + if (!bDontCareAboutSubTicks) + { + pNextStream->readPtr = pOldHead; + } + } + + if (pCurrent->bMultipleTick) + { + bTryAgain = pRead->RemainingBytes(); + bTryAgainAtleastOnce |= bTryAgain; } } + while (bTryAgain); return true; } diff --git a/Source/IO/Protocol/ProtocolStack.hpp b/Source/IO/Protocol/ProtocolStack.hpp index e6a99b02..4a806043 100644 --- a/Source/IO/Protocol/ProtocolStack.hpp +++ b/Source/IO/Protocol/ProtocolStack.hpp @@ -20,6 +20,8 @@ namespace Aurora::IO::Protocol AuSPtr AppendInterceptor(const AuSPtr &pInterceptor, AuUInt uOutputBufferSize) override; AuSPtr AppendInterceptorEx(const AuSPtr &pInterceptor, AuUInt uOutputBufferSize) override; + AuSPtr AppendSingleFrameProcessor(const AuSPtr &pInterceptorEx) override; + AuSPtr AppendSingleFrameProcessorEx(const AuSPtr &pInterceptorEx, AuUInt uOutputBufferSize) override; AuSPtr PrependInterceptor(const AuSPtr &pInterceptor, AuUInt uOutputBufferSize) override; AuSPtr PrependInterceptorEx(const AuSPtr &pInterceptor, AuUInt uOutputBufferSize) override; AuSPtr AddEndInterceptor(const AuSPtr &pInterceptor) override; @@ -27,7 +29,7 @@ namespace Aurora::IO::Protocol void Destroy() override; AuSPtr AddInterceptorWhere(bool prepend, const AuSPtr &pInterceptor, AuUInt uOutputBufferSize); - AuSPtr AddInterceptorWhereEx(bool prepend, const AuSPtr &pInterceptor, AuUInt uOutputBufferSize); + AuSPtr AddInterceptorWhereEx(bool prepend, const AuSPtr &pInterceptor, AuUInt uOutputBufferSize, bool bMultipleTick); AuSPtr AsStreamWriter() override; AuSPtr AsWritableByteBuffer() override;