[+] IProtocolStack::AppendSingleFrameProcessor

[+] IProtocolStack::AppendSingleFrameProcessorEx
[+] ICompressionInterceptor::FlushNextFrame
[+] ICompressionInterceptor::ConfigureAutoFlushPerFrame
This commit is contained in:
Reece Wilson 2022-11-08 19:24:08 +00:00
parent 745b9f974a
commit e145352920
7 changed files with 126 additions and 29 deletions

View File

@ -15,6 +15,19 @@ namespace Aurora::Compression
{
virtual bool HasFailed() = 0;
/**
* @brief
*/
virtual void FlushNextFrame() = 0;
/**
* @brief When enabled, each IO tick is considered a logical frame instead of a streaming, meaning that a flush will occur
* To be used in conjuction with IProtocolStack::AppendSingleFrameProcessorEx
* @param bAutoFlush
* @return
*/
virtual bool ConfigureAutoFlushPerFrame(bool bAutoFlush) = 0;
// TODO (Reece): interface potential
virtual bool LimitHasHit() = 0;

View File

@ -35,6 +35,29 @@ namespace Aurora::IO::Protocol
*/
virtual AuSPtr<IProtocolPiece> AppendInterceptorEx(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx, AuUInt uOutputBufferSize) = 0;
/**
* @brief Same as AppendInterceptor, except that DoTick will repeat until the input is fully consumed.
* This allows us to tick processors under our frame, followed by a retick if there is more data available.
* Such paradigm contrasts the default stream-specific/non-framing behaviour where the processor is
* responsible for consuming all non-segmented data (ala everything except the start of the final unfinished frame).
* With such limitations, it's impossible to build a protocol where a piece in the middle
* 1) can/should only flush 1 frame to the next processor
* 2) reads only a portion of pReadInByteBuffer
* This function sets an internal flag that indicates re-ticks from the root are to be expected to complete
* a tick. Otherwise, the read head of the inbound frame of a persumed stream may only skip back to frame zero
* on error OR frame +1 on success. With this flag, frame current to frame end will be ticked.
* @param pInterceptorEx
* @return
*/
virtual AuSPtr<IProtocolPiece> AppendSingleFrameProcessor(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx) = 0;
/**
* @brief See: AppendSingleFrameProcessor
* @param pInterceptorEx
* @return
*/
virtual AuSPtr<IProtocolPiece> AppendSingleFrameProcessorEx(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx, AuUInt uOutputBufferSize) = 0;
/**
* @brief Inserts the interceptor at the bottom of the protocol stack
* @param pInterceptorEx
@ -53,7 +76,8 @@ namespace Aurora::IO::Protocol
/**
* @brief Sends one tick down the protocol stack, regardless of how much data is written into the
* next piece/interceptor, and regardless of if another read tick is required.
* Latterly, you are responsible for consuming all available bytes in your interceptor.
* Latterly, you are responsible for consuming all available bytes in your non-framed
* (~AppendSingleFrameProcessor[Ex]) interceptors.
*/
virtual void DoTick() = 0;

View File

@ -53,6 +53,11 @@ namespace Aurora::Compression
{
this->pLastBuffer_ = pReadInByteBuffer;
this->pBaseStream_->SetBuffer(pWriteOutByteBuffer);
if (this->LimitHasHit())
{
return pWriteOutByteBuffer->WriteFromEx(*pReadInByteBuffer, AuNumericLimits<AuUInt>::max()); // intentionally non-ex.
}
bool bSuccess { true };
do
@ -67,6 +72,15 @@ namespace Aurora::Compression
bSuccess = a != 0;
}
while (bSuccess);
if ((this->bAutoFlush_) ||
(AuExchange(this->bSendFlush_, false)))
{
if (!this->pBaseStream_->Flush())
{
SysPushErrorIO("Couldn't flush stream. I hope this means no data was available/already flushed quirk occurred bc im still not setting bErrorFlag_ yet. This message might explain session instability. ");
}
}
if (this->LimitHasHit())
{
@ -78,10 +92,21 @@ namespace Aurora::Compression
this->bErrorFlag_ = true;
}
this->pLastBuffer_.reset();
this->pBaseStream_->SetBuffer({});
return true;
}
void CompressionInterceptor::FlushNextFrame()
{
this->bSendFlush_ = true;
}
bool CompressionInterceptor::ConfigureAutoFlushPerFrame(bool bAutoFlush)
{
return AuExchange(this->bAutoFlush_, bAutoFlush);
}
bool CompressionInterceptor::LimitHasHit()
{
return this->uCount_ >= this->uCountMax_;

View File

@ -27,6 +27,10 @@ namespace Aurora::Compression
inline virtual void Close() override;
void FlushNextFrame() override;
bool ConfigureAutoFlushPerFrame(bool bAutoFlush) override;
bool LimitHasHit() override;
void LimitReset() override;
void LimitSet(AuUInt uLength) override;
@ -36,6 +40,8 @@ namespace Aurora::Compression
private:
bool bErrorFlag_ {};
bool bSendFlush_ {};
bool bAutoFlush_ {};
AuSPtr<ICompressionStream> pStream_;
AuSPtr<BaseStream> pBaseStream_;
AuSPtr<Memory::ByteBuffer> pLastBuffer_;

View File

@ -21,6 +21,7 @@ namespace Aurora::IO::Protocol
AuSPtr<ProtocolPiece> pNext;
AuByteBuffer outputBuffer;
AuSPtr<IStreamWriter> pOuputWriter;
bool bMultipleTick {};
void Remove() override;
AuSPtr<IStreamWriter> ToNextWriter() override;

View File

@ -26,7 +26,18 @@ namespace Aurora::IO::Protocol
AuSPtr<IProtocolPiece> ProtocolStack::AppendInterceptorEx(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx,
AuUInt uOutputBufferSize)
{
return this->AddInterceptorWhereEx(false, pInterceptorEx, uOutputBufferSize);
return this->AddInterceptorWhereEx(false, pInterceptorEx, uOutputBufferSize, false);
}
AuSPtr<IProtocolPiece> ProtocolStack::AppendSingleFrameProcessor(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx)
{
return this->AddInterceptorWhereEx(false, pInterceptorEx, 0, true);
}
AuSPtr<IProtocolPiece> ProtocolStack::AppendSingleFrameProcessorEx(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx,
AuUInt uOutputBufferSize)
{
return this->AddInterceptorWhereEx(false, pInterceptorEx, uOutputBufferSize, true);
}
AuSPtr<IProtocolPiece> ProtocolStack::PrependInterceptor(const AuSPtr<IProtocolInterceptor> &pInterceptor,
@ -38,7 +49,7 @@ namespace Aurora::IO::Protocol
AuSPtr<IProtocolPiece> ProtocolStack::PrependInterceptorEx(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx,
AuUInt uOutputBufferSize)
{
return this->AddInterceptorWhereEx(true, pInterceptorEx, uOutputBufferSize);
return this->AddInterceptorWhereEx(true, pInterceptorEx, uOutputBufferSize, false);
}
AuSPtr<IProtocolPiece> ProtocolStack::AddInterceptorWhere(bool prepend,
@ -150,7 +161,8 @@ namespace Aurora::IO::Protocol
AuSPtr<IProtocolPiece> ProtocolStack::AddInterceptorWhereEx(bool prepend,
const AuSPtr<IProtocolInterceptorEx> &pInterceptor,
AuUInt uOutputBufferSize)
AuUInt uOutputBufferSize,
bool bMultipleTick)
{
if (this->bWrittenEnd)
{
@ -237,6 +249,7 @@ namespace Aurora::IO::Protocol
pNew->pWriteInteface = pWrapper;
pNew->pInterceptorEx = pInterceptor;
pNew->pParent = this;
pNew->bMultipleTick = bMultipleTick;
if (prepend)
{
@ -439,37 +452,50 @@ namespace Aurora::IO::Protocol
if (pCurrent->pInterceptorEx)
{
auto pNextStream = ((pPiece == this->pTopPiece) && (this->pDrainBuffer)) ?
(this->pDrainBuffer) :
(!pCurrent->outputBuffer.IsEmpty() ? AuSPtr<AuByteBuffer>(pCurrent, &pCurrent->outputBuffer) : AuSPtr<AuByteBuffer> {});
auto pOldHead = pNextStream ? pNextStream->readPtr : nullptr;
if (!pCurrent->pInterceptorEx->OnDataAvailable(pRead, pNextStream))
bool bTryAgain {};
bool bTryAgainAtleastOnce {};
do
{
if (pNextStream)
auto pNextStream = ((pPiece == this->pTopPiece) && (this->pDrainBuffer)) ?
(this->pDrainBuffer) :
(!pCurrent->outputBuffer.IsEmpty() ? AuSPtr<AuByteBuffer>(pCurrent, &pCurrent->outputBuffer) : AuSPtr<AuByteBuffer> {});
auto pOldHead = pNextStream ? pNextStream->readPtr : nullptr;
if (!pCurrent->pInterceptorEx->OnDataAvailable(pRead, pNextStream))
{
pNextStream->readPtr = pOldHead;
if (pNextStream)
{
pNextStream->readPtr = pOldHead;
}
return bTryAgainAtleastOnce;
}
return false;
}
if (!pNextStream)
{
return true;
}
if (auto pNext = pCurrent->pNext)
{
auto pOldHead = pNextStream->readPtr;
bool bDontCareAboutSubTicks = this->DoTick(pNextStream, pNext);
if (!bDontCareAboutSubTicks)
if (!pNextStream)
{
pNextStream->readPtr = pOldHead;
return true;
}
if (auto pNext = pCurrent->pNext)
{
auto pOldHead = pNextStream->readPtr;
bool bDontCareAboutSubTicks = this->DoTick(pNextStream, pNext);
if (!bDontCareAboutSubTicks)
{
pNextStream->readPtr = pOldHead;
}
}
if (pCurrent->bMultipleTick)
{
bTryAgain = pRead->RemainingBytes();
bTryAgainAtleastOnce |= bTryAgain;
}
}
while (bTryAgain);
return true;
}

View File

@ -20,6 +20,8 @@ namespace Aurora::IO::Protocol
AuSPtr<IProtocolPiece> AppendInterceptor(const AuSPtr<IProtocolInterceptor> &pInterceptor, AuUInt uOutputBufferSize) override;
AuSPtr<IProtocolPiece> AppendInterceptorEx(const AuSPtr<IProtocolInterceptorEx> &pInterceptor, AuUInt uOutputBufferSize) override;
AuSPtr<IProtocolPiece> AppendSingleFrameProcessor(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx) override;
AuSPtr<IProtocolPiece> AppendSingleFrameProcessorEx(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx, AuUInt uOutputBufferSize) override;
AuSPtr<IProtocolPiece> PrependInterceptor(const AuSPtr<IProtocolInterceptor> &pInterceptor, AuUInt uOutputBufferSize) override;
AuSPtr<IProtocolPiece> PrependInterceptorEx(const AuSPtr<IProtocolInterceptorEx> &pInterceptor, AuUInt uOutputBufferSize) override;
AuSPtr<IProtocolPiece> AddEndInterceptor(const AuSPtr<IProtocolInterceptorEx> &pInterceptor) override;
@ -27,7 +29,7 @@ namespace Aurora::IO::Protocol
void Destroy() override;
AuSPtr<IProtocolPiece> AddInterceptorWhere(bool prepend, const AuSPtr<IProtocolInterceptor> &pInterceptor, AuUInt uOutputBufferSize);
AuSPtr<IProtocolPiece> AddInterceptorWhereEx(bool prepend, const AuSPtr<IProtocolInterceptorEx> &pInterceptor, AuUInt uOutputBufferSize);
AuSPtr<IProtocolPiece> AddInterceptorWhereEx(bool prepend, const AuSPtr<IProtocolInterceptorEx> &pInterceptor, AuUInt uOutputBufferSize, bool bMultipleTick);
AuSPtr<IStreamWriter> AsStreamWriter() override;
AuSPtr<Memory::ByteBuffer> AsWritableByteBuffer() override;