AuroraRuntime/Source/IO/Protocol/AuProtocolStack.cpp

1089 lines
32 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuProtocolStack.cpp
Date: 2022-8-24
Author: Reece
***/
#include "Protocol.hpp"
#include "AuProtocolStack.hpp"
#include "AuProtocolPiece.hpp"
#include "IProtocolNext.hpp"
#include <Source/IO/AuIOPipeProcessor.hpp>
namespace Aurora::IO::Protocol
{
ProtocolStack::ProtocolStack() :
readView(this),
writeView(this)
{
}
ProtocolStack::~ProtocolStack()
{
}
AuSPtr<IProtocolPiece> ProtocolStack::AppendInterceptor(const AuSPtr<IProtocolInterceptor> &pInterceptor,
AuUInt uOutputBufferSize)
{
return this->AddInterceptorWhere(false, pInterceptor, uOutputBufferSize);
}
AuSPtr<IProtocolPiece> ProtocolStack::AppendInterceptorEx(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx,
AuUInt uOutputBufferSize)
{
return this->AddInterceptorWhereEx(false, pInterceptorEx, uOutputBufferSize, false);
}
AuSPtr<IProtocolPiece> ProtocolStack::AppendSingleFrameProcessor(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx)
{
return this->AddInterceptorWhereEx(false, pInterceptorEx, 0, true);
}
AuSPtr<IProtocolPiece> ProtocolStack::AppendSingleFrameProcessorEx(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx,
AuUInt uOutputBufferSize)
{
return this->AddInterceptorWhereEx(false, pInterceptorEx, uOutputBufferSize, true);
}
AuSPtr<IProtocolPiece> ProtocolStack::PrependSingleFrameProcessor(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx)
{
return this->AddInterceptorWhereEx(true, pInterceptorEx, 0, true);
}
AuSPtr<IProtocolPiece> ProtocolStack::PrependSingleFrameProcessorEx(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx,
AuUInt uOutputBufferSize)
{
return this->AddInterceptorWhereEx(true, pInterceptorEx, uOutputBufferSize, true);
}
AuSPtr<IProtocolPiece> ProtocolStack::AppendInterceptorDynamicBuffer(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx, AuUInt uOutputBufferSize, AuUInt uOutputBufferSizeMax)
{
return this->AddInterceptorWhereEx(false, pInterceptorEx, uOutputBufferSize, false, uOutputBufferSizeMax);
}
AuSPtr<IProtocolPiece> ProtocolStack::PrependInterceptorDynamicBuffer(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx, AuUInt uOutputBufferSize, AuUInt uOutputBufferSizeMax)
{
return this->AddInterceptorWhereEx(true, pInterceptorEx, uOutputBufferSize, false, uOutputBufferSizeMax);
}
AuSPtr<IProtocolPiece> ProtocolStack::PrependSingleFrameProcessorDynamicBuffer(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx, AuUInt uOutputBufferSize, AuUInt uOutputBufferSizeMax)
{
return this->AddInterceptorWhereEx(true, pInterceptorEx, uOutputBufferSize, true, uOutputBufferSizeMax);
}
AuSPtr<IProtocolPiece> ProtocolStack::AppendSingleFrameProcessorDynamicBuffer(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx, AuUInt uOutputBufferSize, AuUInt uOutputBufferSizeMax)
{
return this->AddInterceptorWhereEx(false, pInterceptorEx, uOutputBufferSize, true, uOutputBufferSizeMax);
}
AuSPtr<IProtocolPiece> ProtocolStack::PrependInterceptor(const AuSPtr<IProtocolInterceptor> &pInterceptor,
AuUInt uOutputBufferSize)
{
return this->AddInterceptorWhere(true, pInterceptor, uOutputBufferSize);
}
AuSPtr<IProtocolPiece> ProtocolStack::PrependInterceptorEx(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx,
AuUInt uOutputBufferSize)
{
return this->AddInterceptorWhereEx(true, pInterceptorEx, uOutputBufferSize, false);
}
AuSPtr<IProtocolPiece> ProtocolStack::AddInterceptorWhere(bool bPrepend,
const AuSPtr<IProtocolInterceptor> &pInterceptor,
AuUInt uOutputBufferSize)
{
AU_LOCK_GUARD(this->mutex);
if (this->bWrittenEnd)
{
return {};
}
if (!uOutputBufferSize)
{
uOutputBufferSize = gRuntimeConfig.ioConfig.uProtocolStackDefaultBufferSize;
}
auto pNew = AuMakeShared<ProtocolPiece>();
if (!pNew)
{
SysPushErrorNet("Out of memory");
return {};
}
pNew->outputBuffer = AuByteBuffer(uOutputBufferSize, true, false);
if (!pNew->outputBuffer.IsValid())
{
SysPushErrorNet("Out of memory");
return {};
}
pNew->outputBuffer.flagNoFree = true;
// Circular ref
pNew->pOuputWriter = AuMakeShared<AuIO::Buffered::BlobWriter>(AuSPtr<Memory::ByteBuffer>(pNew, &pNew->outputBuffer));
struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis<StreamWrapper>
{
AuWPtr<ProtocolStack> pStack;
AuWPtr<IProtocolInterceptor> pInterceptor;
AuWPtr<ProtocolPiece> pParent;
EStreamError IsOpen() override
{
return EStreamError::eErrorNone;
}
EStreamError Write(const Memory::MemoryViewStreamRead &parameters) override
{
auto pCopy = AuMakeShared<AuByteBuffer>(parameters);
if (!pCopy ||
!(*pCopy.get()))
{
return EStreamError::eErrorOutOfMemory;
}
return pStack.lock()->DoTick(pCopy, pParent.lock()) ?
EStreamError::eErrorNone :
EStreamError::eErrorStreamInterrupted;
}
void Close() override
{
if (auto pStack = AuTryLockMemoryType(this->pStack))
{
pStack->Terminate();
}
};
virtual void Flush() override
{
if (auto pStack = AuTryLockMemoryType(this->pStack))
{
pStack->DoTick();
}
}
virtual AuSPtr<Memory::ByteBuffer> GetOutputBuffer() override
{
return {};
}
virtual AuSPtr<IStreamWriter> GetStreamWriter() override
{
return AuSharedFromThis();
}
};
auto pWrapper = AuMakeShared<StreamWrapper>();
if (!pWrapper)
{
SysPushErrorMemory();
return {};
}
pWrapper->pInterceptor = pInterceptor;
pWrapper->pStack = AuSharedFromThis();
pWrapper->pParent = pNew;
pNew->pWriteInteface = pWrapper;
pNew->pParent = this;
pNew->pInterceptor = pInterceptor;
if (bPrepend)
{
if (this->pBottomPiece)
{
pNew->pNext = this->pBottomPiece;
}
this->pBottomPiece = pNew;
}
else
{
if (this->pTopPiece)
{
this->pTopPiece->pNext = pNew;
}
this->pTopPiece = pNew;
if (!this->pBottomPiece)
{
this->pBottomPiece = pNew;
}
}
this->DiscardAllocCaches();
return pNew;
}
AuSPtr<IProtocolPiece> ProtocolStack::AddInterceptorWhereEx(bool bPrepend,
const AuSPtr<IProtocolInterceptorEx> &pInterceptor,
AuUInt uOutputBufferSize,
bool bMultipleTick,
AuOptional<AuUInt> uMax)
{
AU_LOCK_GUARD(this->mutex);
if (this->bWrittenEnd)
{
return {};
}
if (!this->pSourceBufer)
{
return {};
}
if (!uOutputBufferSize)
{
uOutputBufferSize = gRuntimeConfig.ioConfig.uProtocolStackDefaultBufferSize;
}
auto pNew = AuMakeShared<ProtocolPiece>();
if (!pNew)
{
SysPushErrorNet("Out of memory");
return {};
}
pNew->outputBuffer = AuByteBuffer(uOutputBufferSize, true, false);
if (!pNew->outputBuffer.IsValid())
{
SysPushErrorNet("Out of memory");
return {};
}
if (uMax)
{
pNew->uMax = uMax;
pNew->outputBuffer.flagAlwaysExpandable = pNew->outputBuffer.flagExpandable = 1;
}
else
{
pNew->outputBuffer.flagNoFree = true;
}
pNew->uStartingSize = uOutputBufferSize;
// Circular ref
pNew->pOuputWriter = AuMakeShared<AuIO::Buffered::BlobWriter>(AuSPtr<Memory::ByteBuffer>(pNew, &pNew->outputBuffer));
struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis<StreamWrapper>
{
AuWPtr<ProtocolStack> pStack;
AuWPtr<IProtocolInterceptorEx> pInterceptor;
AuWPtr<ProtocolPiece> pParent;
EStreamError IsOpen() override
{
return EStreamError::eErrorNone;
}
EStreamError Write(const Memory::MemoryViewStreamRead &parameters) override
{
auto pCopy = AuMakeShared<AuByteBuffer>(parameters);
if (!pCopy ||
!(*pCopy.get()))
{
return EStreamError::eErrorOutOfMemory;
}
return pStack.lock()->DoTick(pCopy, pParent.lock()) ?
EStreamError::eErrorNone :
EStreamError::eErrorStreamInterrupted;
}
void Close() override
{
if (auto pStack = AuTryLockMemoryType(this->pStack))
{
pStack->Terminate();
}
}
virtual void Flush() override
{
if (auto pStack = AuTryLockMemoryType(this->pStack))
{
pStack->DoTick();
}
}
virtual AuSPtr<Memory::ByteBuffer> GetOutputBuffer() override
{
return {};
}
virtual AuSPtr<IStreamWriter> GetStreamWriter() override
{
return AuSharedFromThis();
}
};
auto pWrapper = AuMakeShared<StreamWrapper>();
if (!pWrapper)
{
SysPushErrorMemory();
return {};
}
pWrapper->pInterceptor = pInterceptor;
pWrapper->pStack = AuSharedFromThis();
pWrapper->pParent = pNew;
pNew->pWriteInteface = pWrapper;
pNew->pInterceptorEx = pInterceptor;
pNew->pParent = this;
pNew->bMultipleTick = bMultipleTick;
if (bPrepend)
{
if (this->pBottomPiece)
{
pNew->pNext = this->pBottomPiece;
}
this->pBottomPiece = pNew;
}
else
{
if (this->pTopPiece)
{
this->pTopPiece->pNext = pNew;
}
this->pTopPiece = pNew;
if (!this->pBottomPiece)
{
this->pBottomPiece = pNew;
}
}
this->DiscardAllocCaches();
return pNew;
}
AuSPtr<IProtocolPiece> ProtocolStack::AddEndInterceptor(const AuSPtr<IProtocolInterceptorEx> &pInterceptor)
{
AU_LOCK_GUARD(this->mutex);
if (this->bWrittenEnd)
{
return {};
}
if (!this->pSourceBufer)
{
return {};
}
auto pNew = AuMakeShared<ProtocolPiece>();
if (!pNew)
{
SysPushErrorNet("Out of memory");
return {};
}
struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis<StreamWrapper>
{
AuWPtr<ProtocolStack> pStack;
AuWPtr<IProtocolInterceptorEx> pInterceptor;
AuWPtr<ProtocolPiece> pParent;
EStreamError IsOpen() override
{
return EStreamError::eErrorNone;
}
EStreamError Write(const Memory::MemoryViewStreamRead &parameters) override
{
auto pCopy = AuMakeShared<AuByteBuffer>(parameters);
if (!pCopy ||
!(*pCopy.get()))
{
return EStreamError::eErrorOutOfMemory;
}
return pStack.lock()->DoTick(pCopy, pParent.lock()) ?
EStreamError::eErrorNone :
EStreamError::eErrorStreamInterrupted;
}
void Close() override
{
if (auto pStack = AuTryLockMemoryType(this->pStack))
{
pStack->Terminate();
}
}
virtual void Flush() override
{
if (auto pStack = AuTryLockMemoryType(this->pStack))
{
pStack->DoTick();
}
}
virtual AuSPtr<Memory::ByteBuffer> GetOutputBuffer() override
{
return {};
}
virtual AuSPtr<IStreamWriter> GetStreamWriter() override
{
return AuSharedFromThis();
}
};
auto pWrapper = AuMakeShared<StreamWrapper>();
if (!pWrapper)
{
SysPushErrorMemory();
return {};
}
pWrapper->pInterceptor = pInterceptor;
pWrapper->pStack = AuSharedFromThis();
pWrapper->pParent = pNew;
pNew->pWriteInteface = pWrapper;
pNew->pInterceptorEx = pInterceptor;
pNew->pParent = this;
if (this->pTopPiece)
{
this->pTopPiece->pNext = pNew;
}
this->pTopPiece = pNew;
if (!this->pBottomPiece)
{
this->pBottomPiece = pNew;
}
this->bWrittenEnd = true;
this->DiscardAllocCaches();
return pNew;
}
void ProtocolStack::Destroy()
{
AU_LOCK_GUARD(this->mutex);
if (this->bOwnsSource)
{
if (this->pSourceBufer)
{
this->pSourceBufer->Reset();
this->pSourceBufer.reset();
}
}
auto pItr = this->pBottomPiece;
while (pItr)
{
auto pCur = pItr;
pItr = pCur->pNext;
pCur->pNext.reset();
pCur->pInterceptor.reset();
pCur->pInterceptorEx.reset();
pCur->pOuputWriter.reset();
pCur->pWriteInteface.reset();
}
this->pBottomPiece.reset();
this->pTopPiece.reset();
}
AuSPtr<IStreamWriter> ProtocolStack::AsStreamWriter()
{
if (!this->bOwnsSource)
{
return {};
}
return AuSPtr<IStreamWriter>(this->SharedFromThis(), &this->writeView);
}
AuSPtr<Memory::ByteBuffer> ProtocolStack::AsWritableByteBuffer()
{
if (!this->bOwnsSource)
{
return {};
}
return this->pSourceBufer;
}
AuSPtr<IStreamReader> ProtocolStack::AsStreamReader()
{
auto pBuffer = AsReadableByteBuffer();
if (!pBuffer)
{
return {};
}
return AuSPtr<IStreamReader>(this->SharedFromThis(), &this->readView);
}
AuSPtr<Memory::ByteBuffer> ProtocolStack::AsReadableByteBuffer()
{
if (!this->pTopPiece)
{
return this->pSourceBufer;
}
if (this->pTopPiece->outputBuffer.IsEmpty())
{
return {};
}
return AuSPtr<AuByteBuffer>(this->pTopPiece, &this->pTopPiece->outputBuffer);
}
void ProtocolStack::Terminate()
{
AU_LOCK_GUARD(this->mutex);
if (auto pStack = AuTryLockMemoryType(this->pPipeWork))
{
(void)pStack->End();
}
this->bDead = true;
}
bool ProtocolStack::DoTick()
{
AU_LOCK_GUARD(this->mutex);
return this->DoTickEx(true);
}
bool ProtocolStack::DoTickEx(bool bPermitRead)
{
if (this->bDead)
{
return false;
}
if (!this->pSourceBufer)
{
return false;
}
if (!this->pBottomPiece)
{
return false;
}
if (this->eType == EProtocolStackCreateType::eByBufferedAdhocStream)
{
if (!this->TryReadMore())
{
return false;
}
}
if (!this->DoTick(this->pSourceBufer, {}))
{
if (this->bKillPipeOnFirstRootLevelFalse)
{
if (this->eType == EProtocolStackCreateType::eByPipe)
{
if (auto pStack = AuTryLockMemoryType(this->pPipeWork))
{
pStack->TerminateOnThread(true);
}
}
this->bDead = true;
}
else
{
if (this->eType == EProtocolStackCreateType::eByBufferedAdhocStream)
{
if (this->TryReadMore(true))
{
if (this->DoTick(this->pSourceBufer, {}))
{
return true;
}
}
}
}
return false;
}
return true;
}
bool ProtocolStack::TryReadMore(bool bForce)
{
if (!bForce)
{
if (this->pSourceBufer->readPtr !=
this->pSourceBufer->writePtr)
{
return true;
}
}
auto pSource = this->pStreamReaderAsInputSource;
if (!pSource)
{
return true;
}
auto uRemBytes = this->pSourceBufer->RemainingWrite();
auto uFragment = this->uPreferredFragmentSize ?
AuMin<AuUInt>(uRemBytes, this->uPreferredFragmentSize) :
uRemBytes;
AuUInt uBytesRead {};
if (AuIO::EStreamError::eErrorNone !=
this->pStreamReaderAsInputSource->Read({ AuStaticCast<AuMemoryViewWrite>(*this->pSourceBufer), uBytesRead }))
{
if (auto pStack = AuTryLockMemoryType(this->pPipeWork))
{
pStack->TerminateOnThread(true);
}
return false;
}
this->pSourceBufer->writePtr += uBytesRead;
return true;
}
bool ProtocolStack::DoTick(const AuSPtr<AuByteBuffer> &pRead, const AuSPtr<ProtocolPiece> &pPiece)
{
auto pCurrent = pPiece ? pPiece : this->pBottomPiece;
if (pCurrent->pInterceptorEx)
{
bool bTryAgain {};
bool bTryAgainAtleastOnce {};
do
{
auto pNextStream = ((pPiece == this->pTopPiece) && (this->pDrainBuffer)) ?
(this->pDrainBuffer) :
(!pCurrent->outputBuffer.IsEmpty() ? AuSPtr<AuByteBuffer>(pCurrent, &pCurrent->outputBuffer) : AuSPtr<AuByteBuffer> {});
auto pOldHead = pNextStream ? pNextStream->readPtr : nullptr;
if (pRead)
{
if (pRead->flagReadError)
{
return false;
}
}
bool bStatus = pCurrent->pInterceptorEx->OnDataAvailable(pRead, pNextStream, pCurrent);
if (!bStatus)
{
if (pNextStream)
{
pNextStream->readPtr = pOldHead;
}
return bTryAgainAtleastOnce;
}
else
{
if (pCurrent->uMax.has_value())
{
auto bytesRem = pCurrent->outputBuffer.RemainingBytes();
auto maxVal = pCurrent->uMax.value();
if (bytesRem >= maxVal)
{
this->Terminate();
return false;
}
auto target = AuMin(AuPageRound<AuUInt>(bytesRem + ((bytesRem / 4) * 3), 128), maxVal);
target = AuMax<AuUInt>(target, pCurrent->uStartingSize);
if (target >= pCurrent->outputBuffer.allocSize ||
((pCurrent->outputBuffer.length > target) && (bytesRem < pCurrent->uStartingSize)))
{
AuByteBuffer replacement(target, true, false);
if (!replacement)
{
this->Terminate();
return false;
}
if (!replacement.WriteFrom(pCurrent->outputBuffer))
{
this->Terminate();
return false;
}
pCurrent->outputBuffer = AuMove(replacement);
pCurrent->outputBuffer.flagAlwaysExpandable =
pCurrent->outputBuffer.flagExpandable = 1;
}
}
}
if (!pNextStream)
{
return true;
}
if (auto pNext = pCurrent->pNext)
{
auto pOldHead = pNextStream->readPtr;
bool bDontCareAboutSubTicks = this->DoTick(pNextStream, pNext);
if (!bDontCareAboutSubTicks)
{
pNextStream->readPtr = pOldHead;
}
}
if (pCurrent->bMultipleTick)
{
bTryAgain = pRead->RemainingBytes();
bTryAgainAtleastOnce |= bTryAgain;
}
if (pCurrent->outputBuffer)
{
if (pCurrent->outputBuffer.flagWriteError)
{
return false;
}
}
}
while (bTryAgain);
return true;
}
else
{
AuUInt8 *pBase {};
AuUInt uCount {};
if (pRead->writePtr >= pRead->readPtr)
{
uCount = pRead->writePtr - pRead->readPtr;
pBase = pRead->readPtr;
}
else if (pRead->flagCircular)
{
uCount = (pRead->base + pRead->length) - pRead->readPtr;
pBase = pRead->readPtr;
}
auto pNextStream = pCurrent->ToNextWriter();
if (!pCurrent->pInterceptor->OnDataAvailable({ pBase, uCount }, pNextStream))
{
return false;
}
return true;
}
}
AuList<AuSPtr<IProtocolPiece>> ProtocolStack::GetArrayOfInterceptors()
{
AU_LOCK_GUARD(this->mutex);
AuList<AuSPtr<IProtocolPiece>> list;
auto pItr = this->pBottomPiece;
while (pItr)
{
auto pCur = pItr;
pItr = pCur->pNext;
list.push_back(pCur);
}
return list;
}
AuSPtr<IProtocolPiece> ProtocolStack::GetInterceptorAtIndex(AuUInt32 uIndex)
{
AU_LOCK_GUARD(this->mutex);
AuUInt32 uCounter {};
auto pItr = this->pBottomPiece;
while (pItr)
{
auto pCur = pItr;
pItr = pCur->pNext;
if (uIndex == uCounter++)
{
return pCur;
}
}
return {};
}
bool ProtocolStack::IsValid()
{
return !this->bDead;
}
void ProtocolStack::DiscardAllocCaches()
{
AuResetMember(this->pStreamReaderCache);
AuResetMember(this->pStreamWriterCache);
}
static AuSPtr<IProtocolStack> _NewBufferedProtocolStack(AuUInt uLength,
ProtocolStackExtraData ex)
{
if (!uLength)
{
SysPushErrorArg();
return {};
}
auto pStack = AuMakeShared<ProtocolStack>();
if (!pStack)
{
SysPushErrorMem();
return {};
}
pStack->eType = EProtocolStackCreateType::eByBuffered;
auto pBuffer = AuMakeShared<AuByteBuffer>(uLength, true);
if (!pBuffer)
{
SysPushErrorMem();
return {};
}
if (!pBuffer->IsValid())
{
return {};
}
pStack->bOwnsSource = true;
pStack->pSourceBufer = pBuffer;
pStack->bStreamReaderFlagAdhocPump = ex.bStreamReaderFlagAdhocPump;
if (ex.bKillPipeOnFirstRootLevelFalse)
{
pStack->bKillPipeOnFirstRootLevelFalse = ex.bKillPipeOnFirstRootLevelFalse.value();
}
return pStack;
}
AUKN_SYM AuSPtr<IProtocolStack> NewBufferedProtocolStack(AuUInt uLength)
{
return _NewBufferedProtocolStack(uLength, {});
}
static AuSPtr<IProtocolStack> _NewProtocolStackFromPipe(const AuSPtr<IIOPipeWork> &pWork,
bool bAutoTick,
bool bKillPipeOnFirstRootLevelFalse,
ProtocolStackExtraData ex)
{
if (!pWork)
{
SysPushErrorArg();
return {};
}
auto pStack = AuMakeShared<ProtocolStack>();
if (!pStack)
{
SysPushErrorMem();
return {};
}
pStack->eType = EProtocolStackCreateType::eByPipe;
auto pWorkEx = AuStaticCast<IOPipeWork>(pWork);
pStack->pPipeWork = pWorkEx;
pStack->bKillPipeOnFirstRootLevelFalse = bKillPipeOnFirstRootLevelFalse;
if (ex.bKillPipeOnFirstRootLevelFalse)
{
pStack->bKillPipeOnFirstRootLevelFalse = ex.bKillPipeOnFirstRootLevelFalse.value();
}
pStack->bStreamReaderFlagAdhocPump = ex.bStreamReaderFlagAdhocPump;
if (bAutoTick)
{
if (pWorkEx->pProtocolStack)
{
SysPushErrorGeneric("NewProtocolStackFromPipe called twice. wont recv DoTick");
return {};
}
else
{
pWorkEx->pProtocolStack = pStack;
pStack->pSourceBufer = AuUnsafeRaiiToShared(pWorkEx->GetBuffer());
return pStack;
}
}
pStack->pSourceBufer = AuSPtr<AuByteBuffer>(pWork, AuStaticCast<IOPipeWork>(pWork)->GetBuffer());
return pStack;
}
static AuSPtr<IProtocolStack> _NewProtocolStackFromStream(const ProtocolStackByBufferedAdhoc &data,
ProtocolStackExtraData ex)
{
auto pStack = AuMakeShared<ProtocolStack>();
if (!pStack)
{
SysPushErrorMem();
return {};
}
if (!data.uBufferedStreamLength)
{
SysPushErrorArg();
return {};
}
pStack->eType = EProtocolStackCreateType::eByBufferedAdhocStream;
auto pBuffer = AuMakeShared<AuByteBuffer>(data.uBufferedStreamLength, true);
if (!pBuffer)
{
SysPushErrorMem();
return {};
}
if (!pBuffer->IsValid())
{
return {};
}
if (ex.bKillPipeOnFirstRootLevelFalse)
{
pStack->bKillPipeOnFirstRootLevelFalse = ex.bKillPipeOnFirstRootLevelFalse.value();
}
pStack->bOwnsSource = true;
pStack->pSourceBufer = pBuffer;
pStack->pStreamReaderAsInputSource = data.pStreamReader;
pStack->uPreferredFragmentSize = data.uPreferredFragmentSize;
pStack->bStreamReaderFlagAdhocPump = ex.bStreamReaderFlagAdhocPump;
pStack->bStreamReaderFlagAdhocRead = ex.bStreamReaderFlagAdhocReadFromAdhoc;
return pStack;
}
AUKN_SYM AuSPtr<IProtocolStack> NewProtocolStackFromPipe(const AuSPtr<IIOPipeWork> &pWork,
bool bAutoTick,
bool bKillPipeOnFirstRootLevelFalse)
{
return _NewProtocolStackFromPipe(pWork, bAutoTick, bKillPipeOnFirstRootLevelFalse, {});
}
AUKN_SYM AuSPtr<IProtocolStack> NewProtocolStackFromDescription(const ProtocolStackDescription &description)
{
if (!EProtocolStackCreateTypeIsValid(description.type))
{
SysPushErrorArg();
return {};
}
switch (description.type)
{
case EProtocolStackCreateType::eByPipe:
return _NewProtocolStackFromPipe(description.byPipe.pWork, description.byPipe.bAutoTick, description.byPipe.bKillPipeOnFirstRootLevelFalse, description.extendedFlags);
case EProtocolStackCreateType::eByBuffered:
return _NewBufferedProtocolStack(description.byBuffered.uBufferedStreamLength, description.extendedFlags);
case EProtocolStackCreateType::eByBufferedAdhocStream:
return _NewProtocolStackFromStream(description.byBufferedStream, description.extendedFlags);
default:
SysUnreachable();
}
}
AUKN_SYM AuSPtr<IStreamReader> UtilityWrapStreamReaderWithInterceptors(AuSPtr<IStreamReader> pStreamReader,
AuList<AuTuple<bool /* bSingleFrame*/, AuUInt /*nextBufferSize*/, AuSPtr<IProtocolInterceptorEx>>> pairs,
AuUInt uBaseStreamLength,
AuUInt uPreferredFragmentSize,
AuSPtr<IProtocolStack> *pOptOutStack)
{
ProtocolStackDescription desc;
desc.type = EProtocolStackCreateType::eByBufferedAdhocStream;
desc.byBufferedStream.pStreamReader = pStreamReader;
desc.byBufferedStream.uBufferedStreamLength = uBaseStreamLength;
desc.byBufferedStream.uPreferredFragmentSize = uPreferredFragmentSize;
desc.extendedFlags.bStreamReaderFlagAdhocReadFromAdhoc = true;
AuSPtr<IProtocolStack> pStack;
if (!(pStack = NewProtocolStackFromDescription(desc)))
{
SysPushErrorNested();
return {};
}
for (AU_ITERATE_N(i, pairs.size()))
{
const auto &[bSingle, uNextBufferSize, pPointer] = pairs[i];
if (bSingle)
{
if (!pStack->AppendSingleFrameProcessorEx(pPointer, uNextBufferSize))
{
SysPushErrorNested();
return {};
}
}
else
{
if (i == pairs.size() - 1)
{
if (!pStack->AddEndInterceptor(pPointer))
{
SysPushErrorNested();
return {};
}
}
else
{
if (!pStack->AppendInterceptorEx(pPointer, uNextBufferSize))
{
SysPushErrorNested();
return {};
}
}
}
}
if (pOptOutStack)
{
*pOptOutStack = pStack;
}
return pStack->AsStreamReader();
}
}