1120 lines
33 KiB
C++
1120 lines
33 KiB
C++
/***
|
|
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
File: AuProtocolStack.cpp
|
|
Date: 2022-8-24
|
|
Author: Reece
|
|
***/
|
|
#include "Protocol.hpp"
|
|
#include "AuProtocolStack.hpp"
|
|
#include "AuProtocolPiece.hpp"
|
|
#include "IProtocolNext.hpp"
|
|
#include <Source/IO/AuIOPipeProcessor.hpp>
|
|
|
|
namespace Aurora::IO::Protocol
|
|
{
|
|
ProtocolStack::ProtocolStack() :
|
|
readView(this),
|
|
writeView(this)
|
|
{
|
|
|
|
}
|
|
|
|
ProtocolStack::~ProtocolStack()
|
|
{
|
|
}
|
|
|
|
AuSPtr<IProtocolPiece> ProtocolStack::AddInterceptor(EProtocolWhere eWhere,
|
|
const AuSPtr<IProtocolInterceptor> &pInterceptor,
|
|
AuUInt uOutputBufferSize)
|
|
{
|
|
return this->AddInterceptorWhere(eWhere,
|
|
pInterceptor,
|
|
uOutputBufferSize);
|
|
}
|
|
|
|
AuSPtr<IProtocolPiece> ProtocolStack::AddInterceptorEx(EProtocolWhere eWhere,
|
|
const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx,
|
|
AuUInt uOutputBufferSize)
|
|
{
|
|
return this->AddInterceptorWhereEx(eWhere,
|
|
pInterceptorEx,
|
|
uOutputBufferSize,
|
|
false);
|
|
}
|
|
|
|
AuSPtr<IProtocolPiece> ProtocolStack::AddSingleFrameProcessor(EProtocolWhere eWhere,
|
|
const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx)
|
|
{
|
|
return this->AddInterceptorWhereEx(eWhere,
|
|
pInterceptorEx,
|
|
0,
|
|
true);
|
|
}
|
|
|
|
AuSPtr<IProtocolPiece> ProtocolStack::AddSingleFrameProcessorEx(EProtocolWhere eWhere,
|
|
const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx,
|
|
AuUInt uOutputBufferSize)
|
|
{
|
|
return this->AddInterceptorWhereEx(eWhere,
|
|
pInterceptorEx,
|
|
uOutputBufferSize,
|
|
true);
|
|
}
|
|
|
|
AuSPtr<IProtocolPiece> ProtocolStack::AddInterceptorDynamicBuffer(EProtocolWhere eWhere,
|
|
const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx,
|
|
AuUInt uOutputBufferSize,
|
|
AuUInt uOutputBufferSizeMax)
|
|
{
|
|
return this->AddInterceptorWhereEx(eWhere,
|
|
pInterceptorEx,
|
|
uOutputBufferSize,
|
|
false,
|
|
uOutputBufferSizeMax);
|
|
}
|
|
|
|
AuSPtr<IProtocolPiece> ProtocolStack::AddSingleFrameProcessorDynamicBuffer(EProtocolWhere eWhere,
|
|
const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx,
|
|
AuUInt uOutputBufferSize,
|
|
AuUInt uOutputBufferSizeMax)
|
|
{
|
|
return this->AddInterceptorWhereEx(eWhere,
|
|
pInterceptorEx,
|
|
uOutputBufferSize,
|
|
true,
|
|
uOutputBufferSizeMax);
|
|
}
|
|
|
|
AuSPtr<IProtocolPiece> ProtocolStack::AddInterceptorWhere(EProtocolWhere eWhere,
|
|
const AuSPtr<IProtocolInterceptor> &pInterceptor,
|
|
AuUInt uOutputBufferSize)
|
|
{
|
|
if (!EProtocolWhereIsValid(eWhere))
|
|
{
|
|
SysPushErrorArg();
|
|
return {};
|
|
}
|
|
|
|
if (!pInterceptor)
|
|
{
|
|
SysPushErrorArg();
|
|
return {};
|
|
}
|
|
|
|
AU_LOCK_GUARD(this->mutex);
|
|
|
|
if (this->bWrittenEnd)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
if (!uOutputBufferSize)
|
|
{
|
|
uOutputBufferSize = gRuntimeConfig.ioConfig.uProtocolStackDefaultBufferSize;
|
|
}
|
|
|
|
auto pNew = AuMakeShared<ProtocolPiece>();
|
|
if (!pNew)
|
|
{
|
|
SysPushErrorNet("Out of memory");
|
|
return {};
|
|
}
|
|
|
|
pNew->outputBuffer = AuByteBuffer(uOutputBufferSize, true, false);
|
|
if (!pNew->outputBuffer.IsValid())
|
|
{
|
|
SysPushErrorNet("Out of memory");
|
|
return {};
|
|
}
|
|
|
|
pNew->outputBuffer.flagNoRealloc = true;
|
|
|
|
// Circular ref
|
|
pNew->pOuputWriter = AuMakeShared<AuIO::Buffered::BlobWriter>(AuSPtr<Memory::ByteBuffer>(pNew, &pNew->outputBuffer));
|
|
if (!pNew->pOuputWriter)
|
|
{
|
|
SysPushErrorNet("Out of memory");
|
|
return {};
|
|
}
|
|
|
|
struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis<StreamWrapper>
|
|
{
|
|
AuWPtr<ProtocolStack> pStack;
|
|
AuWPtr<IProtocolInterceptor> pInterceptor;
|
|
AuWPtr<ProtocolPiece> pParent;
|
|
|
|
EStreamError IsOpen() override
|
|
{
|
|
return EStreamError::eErrorNone;
|
|
}
|
|
|
|
EStreamError Write(const Memory::MemoryViewStreamRead ¶meters) override
|
|
{
|
|
parameters.outVariable = 0;
|
|
|
|
auto pCopy = AuMakeShared<AuByteBuffer>(parameters);
|
|
|
|
if (!pCopy ||
|
|
!(*pCopy.get()))
|
|
{
|
|
return EStreamError::eErrorOutOfMemory;
|
|
}
|
|
|
|
return pStack.lock()->DoTick(pCopy, pParent.lock()) ?
|
|
EStreamError::eErrorNone :
|
|
EStreamError::eErrorStreamInterrupted;
|
|
}
|
|
|
|
void Close() override
|
|
{
|
|
if (auto pStack = AuTryLockMemoryType(this->pStack))
|
|
{
|
|
pStack->Terminate();
|
|
}
|
|
};
|
|
|
|
virtual void Flush() override
|
|
{
|
|
if (auto pStack = AuTryLockMemoryType(this->pStack))
|
|
{
|
|
pStack->DoTick();
|
|
}
|
|
}
|
|
|
|
virtual AuSPtr<Memory::ByteBuffer> GetOutputBuffer() override
|
|
{
|
|
return {};
|
|
}
|
|
|
|
virtual AuSPtr<IStreamWriter> GetStreamWriter() override
|
|
{
|
|
return AuSharedFromThis();
|
|
}
|
|
};
|
|
|
|
auto pWrapper = AuMakeShared<StreamWrapper>();
|
|
if (!pWrapper)
|
|
{
|
|
SysPushErrorMemory();
|
|
return {};
|
|
}
|
|
|
|
pWrapper->pInterceptor = pInterceptor;
|
|
pWrapper->pStack = AuSharedFromThis();
|
|
pWrapper->pParent = pNew;
|
|
|
|
pNew->pWriteInteface = pWrapper;
|
|
pNew->pParent = this;
|
|
pNew->pInterceptor = pInterceptor;
|
|
|
|
if (eWhere == EProtocolWhere::ePrepend)
|
|
{
|
|
if (auto pBottomPiece = this->pBottomPiece)
|
|
{
|
|
pBottomPiece->pBefore = pNew;
|
|
pNew->pNext = pBottomPiece;
|
|
}
|
|
|
|
this->pBottomPiece = pNew;
|
|
}
|
|
else
|
|
{
|
|
if (this->pTopPiece)
|
|
{
|
|
pNew->pBefore = this->pTopPiece;
|
|
this->pTopPiece->pNext = pNew;
|
|
}
|
|
|
|
this->pTopPiece = pNew;
|
|
|
|
if (!this->pBottomPiece)
|
|
{
|
|
this->pBottomPiece = pNew;
|
|
}
|
|
}
|
|
|
|
this->DiscardAllocCaches();
|
|
|
|
return pNew;
|
|
}
|
|
|
|
AuSPtr<IProtocolPiece> ProtocolStack::AddInterceptorWhereEx(EProtocolWhere eWhere,
|
|
const AuSPtr<IProtocolInterceptorEx> &pInterceptor,
|
|
AuUInt uOutputBufferSize,
|
|
bool bMultipleTick,
|
|
AuOptional<AuUInt> uMax)
|
|
{
|
|
if (!EProtocolWhereIsValid(eWhere))
|
|
{
|
|
SysPushErrorArg();
|
|
return {};
|
|
}
|
|
|
|
if (!pInterceptor)
|
|
{
|
|
SysPushErrorArg();
|
|
return {};
|
|
}
|
|
|
|
AU_LOCK_GUARD(this->mutex);
|
|
|
|
if (this->bWrittenEnd)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
if (!this->pSourceBufer)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
if (!uOutputBufferSize)
|
|
{
|
|
uOutputBufferSize = gRuntimeConfig.ioConfig.uProtocolStackDefaultBufferSize;
|
|
}
|
|
|
|
auto pNew = AuMakeShared<ProtocolPiece>();
|
|
if (!pNew)
|
|
{
|
|
SysPushErrorNet("Out of memory");
|
|
return {};
|
|
}
|
|
|
|
pNew->outputBuffer = AuByteBuffer(uOutputBufferSize, true, false);
|
|
if (!pNew->outputBuffer.IsValid())
|
|
{
|
|
SysPushErrorNet("Out of memory");
|
|
return {};
|
|
}
|
|
|
|
if (uMax)
|
|
{
|
|
pNew->uMax = uMax;
|
|
pNew->outputBuffer.flagAlwaysExpandable = pNew->outputBuffer.flagExpandable = 1;
|
|
}
|
|
else
|
|
{
|
|
pNew->outputBuffer.flagNoRealloc = true;
|
|
}
|
|
|
|
pNew->uStartingSize = uOutputBufferSize;
|
|
|
|
// Circular ref
|
|
pNew->pOuputWriter = AuMakeShared<AuIO::Buffered::BlobWriter>(AuSPtr<Memory::ByteBuffer>(pNew, &pNew->outputBuffer));
|
|
if (!pNew->pOuputWriter)
|
|
{
|
|
SysPushErrorNet("Out of memory");
|
|
return {};
|
|
}
|
|
|
|
struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis<StreamWrapper>
|
|
{
|
|
AuWPtr<ProtocolStack> pStack;
|
|
AuWPtr<IProtocolInterceptorEx> pInterceptor;
|
|
AuWPtr<ProtocolPiece> pParent;
|
|
|
|
EStreamError IsOpen() override
|
|
{
|
|
return EStreamError::eErrorNone;
|
|
}
|
|
|
|
EStreamError Write(const Memory::MemoryViewStreamRead ¶meters) override
|
|
{
|
|
parameters.outVariable = 0;
|
|
|
|
auto pCopy = AuMakeShared<AuByteBuffer>(parameters);
|
|
|
|
if (!pCopy ||
|
|
!(*pCopy.get()))
|
|
{
|
|
return EStreamError::eErrorOutOfMemory;
|
|
}
|
|
|
|
return pStack.lock()->DoTick(pCopy, pParent.lock()) ?
|
|
EStreamError::eErrorNone :
|
|
EStreamError::eErrorStreamInterrupted;
|
|
}
|
|
|
|
void Close() override
|
|
{
|
|
if (auto pStack = AuTryLockMemoryType(this->pStack))
|
|
{
|
|
pStack->Terminate();
|
|
}
|
|
}
|
|
|
|
virtual void Flush() override
|
|
{
|
|
if (auto pStack = AuTryLockMemoryType(this->pStack))
|
|
{
|
|
pStack->DoTick();
|
|
}
|
|
}
|
|
|
|
virtual AuSPtr<Memory::ByteBuffer> GetOutputBuffer() override
|
|
{
|
|
return {};
|
|
}
|
|
|
|
virtual AuSPtr<IStreamWriter> GetStreamWriter() override
|
|
{
|
|
return AuSharedFromThis();
|
|
}
|
|
};
|
|
|
|
auto pWrapper = AuMakeShared<StreamWrapper>();
|
|
if (!pWrapper)
|
|
{
|
|
SysPushErrorMemory();
|
|
return {};
|
|
}
|
|
|
|
pWrapper->pInterceptor = pInterceptor;
|
|
pWrapper->pStack = AuSharedFromThis();
|
|
pWrapper->pParent = pNew;
|
|
|
|
pNew->pWriteInteface = pWrapper;
|
|
pNew->pInterceptorEx = pInterceptor;
|
|
pNew->pParent = this;
|
|
pNew->bMultipleTick = bMultipleTick;
|
|
|
|
if (eWhere == EProtocolWhere::ePrepend)
|
|
{
|
|
if (auto pBottomPiece = this->pBottomPiece)
|
|
{
|
|
pBottomPiece->pBefore = pNew;
|
|
pNew->pNext = pBottomPiece;
|
|
}
|
|
|
|
this->pBottomPiece = pNew;
|
|
}
|
|
else
|
|
{
|
|
if (this->pTopPiece)
|
|
{
|
|
pNew->pBefore = this->pTopPiece;
|
|
this->pTopPiece->pNext = pNew;
|
|
}
|
|
|
|
this->pTopPiece = pNew;
|
|
|
|
if (!this->pBottomPiece)
|
|
{
|
|
this->pBottomPiece = pNew;
|
|
}
|
|
}
|
|
|
|
this->DiscardAllocCaches();
|
|
|
|
return pNew;
|
|
}
|
|
|
|
AuSPtr<IProtocolPiece> ProtocolStack::AddEndInterceptor(const AuSPtr<IProtocolInterceptorEx> &pInterceptor)
|
|
{
|
|
AU_LOCK_GUARD(this->mutex);
|
|
|
|
if (this->bWrittenEnd)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
if (!this->pSourceBufer)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
auto pNew = AuMakeShared<ProtocolPiece>();
|
|
if (!pNew)
|
|
{
|
|
SysPushErrorNet("Out of memory");
|
|
return {};
|
|
}
|
|
|
|
struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis<StreamWrapper>
|
|
{
|
|
AuWPtr<ProtocolStack> pStack;
|
|
AuWPtr<IProtocolInterceptorEx> pInterceptor;
|
|
AuWPtr<ProtocolPiece> pParent;
|
|
|
|
EStreamError IsOpen() override
|
|
{
|
|
return EStreamError::eErrorNone;
|
|
}
|
|
|
|
EStreamError Write(const Memory::MemoryViewStreamRead ¶meters) override
|
|
{
|
|
parameters.outVariable = 0;
|
|
|
|
auto pCopy = AuMakeShared<AuByteBuffer>(parameters);
|
|
|
|
if (!pCopy ||
|
|
!(*pCopy.get()))
|
|
{
|
|
return EStreamError::eErrorOutOfMemory;
|
|
}
|
|
|
|
return pStack.lock()->DoTick(pCopy, pParent.lock()) ?
|
|
EStreamError::eErrorNone :
|
|
EStreamError::eErrorStreamInterrupted;
|
|
}
|
|
|
|
void Close() override
|
|
{
|
|
if (auto pStack = AuTryLockMemoryType(this->pStack))
|
|
{
|
|
pStack->Terminate();
|
|
}
|
|
}
|
|
|
|
virtual void Flush() override
|
|
{
|
|
if (auto pStack = AuTryLockMemoryType(this->pStack))
|
|
{
|
|
pStack->DoTick();
|
|
}
|
|
}
|
|
|
|
virtual AuSPtr<Memory::ByteBuffer> GetOutputBuffer() override
|
|
{
|
|
return {};
|
|
}
|
|
|
|
virtual AuSPtr<IStreamWriter> GetStreamWriter() override
|
|
{
|
|
return AuSharedFromThis();
|
|
}
|
|
};
|
|
|
|
auto pWrapper = AuMakeShared<StreamWrapper>();
|
|
if (!pWrapper)
|
|
{
|
|
SysPushErrorMemory();
|
|
return {};
|
|
}
|
|
|
|
pWrapper->pInterceptor = pInterceptor;
|
|
pWrapper->pStack = AuSharedFromThis();
|
|
pWrapper->pParent = pNew;
|
|
|
|
pNew->pWriteInteface = pWrapper;
|
|
pNew->pInterceptorEx = pInterceptor;
|
|
pNew->pParent = this;
|
|
|
|
if (this->pTopPiece)
|
|
{
|
|
pNew->pBefore = this->pTopPiece;
|
|
this->pTopPiece->pNext = pNew;
|
|
}
|
|
|
|
this->pTopPiece = pNew;
|
|
|
|
if (!this->pBottomPiece)
|
|
{
|
|
this->pBottomPiece = pNew;
|
|
}
|
|
|
|
this->bWrittenEnd = true;
|
|
this->DiscardAllocCaches();
|
|
|
|
return pNew;
|
|
}
|
|
|
|
void ProtocolStack::Destroy()
|
|
{
|
|
AU_LOCK_GUARD(this->mutex);
|
|
|
|
if (this->bOwnsSource)
|
|
{
|
|
if (this->pSourceBufer)
|
|
{
|
|
this->pSourceBufer->Reset();
|
|
this->pSourceBufer.reset();
|
|
}
|
|
}
|
|
|
|
auto pItr = this->pBottomPiece;
|
|
while (pItr)
|
|
{
|
|
auto pCur = pItr;
|
|
pItr = pCur->pNext;
|
|
pCur->pNext.reset();
|
|
pCur->pBefore.reset();
|
|
pCur->pInterceptor.reset();
|
|
pCur->pInterceptorEx.reset();
|
|
pCur->pOuputWriter.reset();
|
|
pCur->pWriteInteface.reset();
|
|
pCur->PrivateUserDataClear();
|
|
}
|
|
|
|
this->pBottomPiece.reset();
|
|
this->pTopPiece.reset();
|
|
}
|
|
|
|
AuSPtr<IStreamWriter> ProtocolStack::AsStreamWriter()
|
|
{
|
|
if (!this->bOwnsSource)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
return AuSPtr<IStreamWriter>(this->SharedFromThis(), &this->writeView);
|
|
}
|
|
|
|
AuSPtr<Memory::ByteBuffer> ProtocolStack::AsWritableByteBuffer()
|
|
{
|
|
if (!this->bOwnsSource)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
return this->pSourceBufer;
|
|
}
|
|
|
|
AuSPtr<IStreamReader> ProtocolStack::AsStreamReader()
|
|
{
|
|
auto pBuffer = AsReadableByteBuffer();
|
|
if (!pBuffer)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
return AuSPtr<IStreamReader>(this->SharedFromThis(), &this->readView);
|
|
}
|
|
|
|
AuSPtr<Memory::ByteBuffer> ProtocolStack::AsReadableByteBuffer()
|
|
{
|
|
if (!this->pTopPiece)
|
|
{
|
|
return this->pSourceBufer;
|
|
}
|
|
|
|
if (this->pTopPiece->outputBuffer.IsEmpty())
|
|
{
|
|
return {};
|
|
}
|
|
|
|
return AuSPtr<AuByteBuffer>(this->pTopPiece, &this->pTopPiece->outputBuffer);
|
|
}
|
|
|
|
void ProtocolStack::Terminate()
|
|
{
|
|
AU_LOCK_GUARD(this->mutex);
|
|
|
|
if (auto pStack = AuTryLockMemoryType(this->pPipeWork))
|
|
{
|
|
(void)pStack->End();
|
|
}
|
|
|
|
this->bDead = true;
|
|
}
|
|
|
|
bool ProtocolStack::DoTick()
|
|
{
|
|
AU_LOCK_GUARD(this->mutex);
|
|
|
|
return this->DoTickEx(true);
|
|
}
|
|
|
|
bool ProtocolStack::DoTickEx(bool bPermitRead)
|
|
{
|
|
if (this->bDead)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
if (!this->pSourceBufer)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
if (!this->pBottomPiece)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
if (this->eType == EProtocolStackCreateType::eByBufferedAdhocStream)
|
|
{
|
|
if (!this->TryReadMore())
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
|
|
if (!this->DoTick(this->pSourceBufer, {}))
|
|
{
|
|
if (this->bKillPipeOnFirstRootLevelFalse)
|
|
{
|
|
if (this->eType == EProtocolStackCreateType::eByPipe)
|
|
{
|
|
if (auto pStack = AuTryLockMemoryType(this->pPipeWork))
|
|
{
|
|
pStack->TerminateOnThread(true);
|
|
}
|
|
}
|
|
|
|
this->bDead = true;
|
|
}
|
|
else
|
|
{
|
|
if (this->eType == EProtocolStackCreateType::eByBufferedAdhocStream)
|
|
{
|
|
if (this->TryReadMore(true))
|
|
{
|
|
if (this->DoTick(this->pSourceBufer, {}))
|
|
{
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
return !this->bDead;
|
|
}
|
|
|
|
bool ProtocolStack::TryReadMore(bool bForce)
|
|
{
|
|
if (!bForce)
|
|
{
|
|
if (this->pSourceBufer->readPtr !=
|
|
this->pSourceBufer->writePtr)
|
|
{
|
|
return true;
|
|
}
|
|
}
|
|
|
|
auto pSource = this->pStreamReaderAsInputSource;
|
|
if (!pSource)
|
|
{
|
|
return true;
|
|
}
|
|
|
|
auto uRemBytes = this->pSourceBufer->RemainingWrite();
|
|
auto uFragment = this->uPreferredFragmentSize ?
|
|
AuMin<AuUInt>(uRemBytes, this->uPreferredFragmentSize) :
|
|
uRemBytes;
|
|
|
|
AuUInt uBytesRead {};
|
|
if (AuIO::EStreamError::eErrorNone !=
|
|
this->pStreamReaderAsInputSource->Read({ AuStaticCast<AuMemoryViewWrite>(*this->pSourceBufer), uBytesRead }))
|
|
{
|
|
if (auto pStack = AuTryLockMemoryType(this->pPipeWork))
|
|
{
|
|
pStack->TerminateOnThread(true);
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
this->pSourceBufer->writePtr += uBytesRead;
|
|
return true;
|
|
}
|
|
|
|
bool ProtocolStack::DoTick(const AuSPtr<AuByteBuffer> &pRead, const AuSPtr<ProtocolPiece> &pPiece)
|
|
{
|
|
auto pCurrent = pPiece ? pPiece : this->pBottomPiece;
|
|
|
|
if (pCurrent->pInterceptorEx)
|
|
{
|
|
bool bTryAgain {};
|
|
bool bTryAgainAtleastOnce {};
|
|
do
|
|
{
|
|
auto pNextStream = ((pPiece == this->pTopPiece) && (this->pDrainBuffer)) ?
|
|
(this->pDrainBuffer) :
|
|
(!pCurrent->outputBuffer.IsEmpty() ? AuSPtr<AuByteBuffer>(pCurrent, &pCurrent->outputBuffer) : AuSPtr<AuByteBuffer> {});
|
|
|
|
auto pOldHead = pNextStream ? pNextStream->readPtr : nullptr;
|
|
|
|
if (pRead)
|
|
{
|
|
if (pRead->flagReadError)
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
|
|
bool bStatus = pCurrent->pInterceptorEx->OnDataAvailable(pRead, pNextStream, pCurrent);
|
|
|
|
if (!bStatus)
|
|
{
|
|
if (pNextStream)
|
|
{
|
|
pNextStream->readPtr = pOldHead;
|
|
}
|
|
|
|
return bTryAgainAtleastOnce;
|
|
}
|
|
else
|
|
{
|
|
if (pCurrent->uMax.has_value())
|
|
{
|
|
auto bytesRem = pCurrent->outputBuffer.RemainingBytes();
|
|
auto maxVal = pCurrent->uMax.value();
|
|
if (bytesRem >= maxVal)
|
|
{
|
|
this->Terminate();
|
|
return false;
|
|
}
|
|
|
|
auto target = AuMin(AuPageRound<AuUInt>(bytesRem + ((bytesRem / 4) * 3), 128), maxVal);
|
|
target = AuMax<AuUInt>(target, pCurrent->uStartingSize);
|
|
|
|
if (target >= pCurrent->outputBuffer.allocSize ||
|
|
((pCurrent->outputBuffer.length > target) && (bytesRem < pCurrent->uStartingSize)))
|
|
{
|
|
if (pCurrent->outputBuffer.Resize(target))
|
|
{
|
|
pCurrent->outputBuffer.flagAlwaysExpandable =
|
|
pCurrent->outputBuffer.flagExpandable = 1;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!pNextStream)
|
|
{
|
|
return true;
|
|
}
|
|
|
|
if (auto pNext = pCurrent->pNext)
|
|
{
|
|
auto pOldHead = pNextStream->readPtr;
|
|
|
|
bool bDontCareAboutSubTicks = this->DoTick(pNextStream, pNext);
|
|
|
|
if (!bDontCareAboutSubTicks)
|
|
{
|
|
pNextStream->readPtr = pOldHead;
|
|
}
|
|
}
|
|
|
|
if (pCurrent->bMultipleTick)
|
|
{
|
|
bTryAgain = pRead->RemainingBytes();
|
|
bTryAgainAtleastOnce |= bTryAgain;
|
|
}
|
|
|
|
if (pCurrent->outputBuffer)
|
|
{
|
|
if (pCurrent->outputBuffer.flagWriteError)
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
while (bTryAgain && this->IsValid());
|
|
|
|
return true;
|
|
}
|
|
else
|
|
{
|
|
AuUInt8 *pBase {};
|
|
AuUInt uCount {};
|
|
|
|
if (pRead->writePtr >= pRead->readPtr)
|
|
{
|
|
uCount = pRead->writePtr - pRead->readPtr;
|
|
pBase = pRead->readPtr;
|
|
}
|
|
else if (pRead->flagCircular)
|
|
{
|
|
uCount = (pRead->base + pRead->length) - pRead->readPtr;
|
|
pBase = pRead->readPtr;
|
|
}
|
|
|
|
auto pNextStream = pCurrent->GetNextWriter();
|
|
|
|
if (!pCurrent->pInterceptor->OnDataAvailable({ pBase, uCount }, pNextStream))
|
|
{
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
}
|
|
|
|
AuList<AuSPtr<IProtocolPiece>> ProtocolStack::GetArrayOfInterceptors()
|
|
{
|
|
AU_LOCK_GUARD(this->mutex);
|
|
AuList<AuSPtr<IProtocolPiece>> list;
|
|
|
|
auto pItr = this->pBottomPiece;
|
|
while (pItr)
|
|
{
|
|
auto pCur = pItr;
|
|
pItr = pCur->pNext;
|
|
list.push_back(pCur);
|
|
}
|
|
|
|
return list;
|
|
}
|
|
|
|
AuSPtr<IProtocolPiece> ProtocolStack::GetInterceptorAtIndex(AuUInt32 uIndex)
|
|
{
|
|
AU_LOCK_GUARD(this->mutex);
|
|
AuUInt32 uCounter {};
|
|
|
|
auto pItr = this->pBottomPiece;
|
|
while (pItr)
|
|
{
|
|
auto pCur = pItr;
|
|
pItr = pCur->pNext;
|
|
|
|
if (uIndex == uCounter++)
|
|
{
|
|
return pCur;
|
|
}
|
|
}
|
|
|
|
return {};
|
|
}
|
|
|
|
bool ProtocolStack::IsValid()
|
|
{
|
|
return !this->bDead;
|
|
}
|
|
|
|
void ProtocolStack::DiscardAllocCaches()
|
|
{
|
|
AuResetMember(this->pStreamReaderCache);
|
|
AuResetMember(this->pStreamWriterCache);
|
|
}
|
|
|
|
static AuSPtr<IProtocolStack> _NewBufferedProtocolStack(AuUInt uLength,
|
|
ProtocolStackExtraData ex)
|
|
{
|
|
if (!uLength)
|
|
{
|
|
SysPushErrorArg();
|
|
return {};
|
|
}
|
|
|
|
auto pStack = AuMakeShared<ProtocolStack>();
|
|
if (!pStack)
|
|
{
|
|
SysPushErrorMem();
|
|
return {};
|
|
}
|
|
|
|
pStack->eType = EProtocolStackCreateType::eByBuffered;
|
|
|
|
auto pBuffer = AuMakeShared<AuByteBuffer>(uLength, true);
|
|
if (!pBuffer)
|
|
{
|
|
SysPushErrorMem();
|
|
return {};
|
|
}
|
|
|
|
if (!pBuffer->IsValid())
|
|
{
|
|
return {};
|
|
}
|
|
|
|
pStack->bOwnsSource = true;
|
|
pStack->pSourceBufer = pBuffer;
|
|
|
|
pStack->bStreamReaderFlagAdhocPump = ex.bStreamReaderFlagAdhocPump;
|
|
|
|
if (ex.bKillPipeOnFirstRootLevelFalse)
|
|
{
|
|
pStack->bKillPipeOnFirstRootLevelFalse = ex.bKillPipeOnFirstRootLevelFalse.value();
|
|
}
|
|
|
|
return pStack;
|
|
}
|
|
|
|
AUKN_SYM AuSPtr<IProtocolStack> NewBufferedProtocolStack(AuUInt uLength)
|
|
{
|
|
return _NewBufferedProtocolStack(uLength, {});
|
|
}
|
|
|
|
static AuSPtr<IProtocolStack> _NewProtocolStackFromPipe(const AuSPtr<IIOPipeWork> &pWork,
|
|
bool bAutoTick,
|
|
bool bKillPipeOnFirstRootLevelFalse,
|
|
ProtocolStackExtraData ex)
|
|
{
|
|
if (!pWork)
|
|
{
|
|
SysPushErrorArg();
|
|
return {};
|
|
}
|
|
|
|
auto pStack = AuMakeShared<ProtocolStack>();
|
|
if (!pStack)
|
|
{
|
|
SysPushErrorMem();
|
|
return {};
|
|
}
|
|
|
|
pStack->eType = EProtocolStackCreateType::eByPipe;
|
|
|
|
auto pWorkEx = AuStaticCast<IOPipeWork>(pWork);
|
|
|
|
pStack->pPipeWork = pWorkEx;
|
|
pStack->bKillPipeOnFirstRootLevelFalse = bKillPipeOnFirstRootLevelFalse;
|
|
|
|
if (ex.bKillPipeOnFirstRootLevelFalse)
|
|
{
|
|
pStack->bKillPipeOnFirstRootLevelFalse = ex.bKillPipeOnFirstRootLevelFalse.value();
|
|
}
|
|
|
|
pStack->bStreamReaderFlagAdhocPump = ex.bStreamReaderFlagAdhocPump;
|
|
|
|
if (bAutoTick)
|
|
{
|
|
if (pWorkEx->pProtocolStack)
|
|
{
|
|
SysPushErrorGeneric("NewProtocolStackFromPipe called twice. wont recv DoTick");
|
|
return {};
|
|
}
|
|
else
|
|
{
|
|
pWorkEx->pProtocolStack = pStack;
|
|
pStack->pSourceBufer = AuUnsafeRaiiToShared(pWorkEx->GetBuffer());
|
|
return pStack;
|
|
}
|
|
}
|
|
|
|
pStack->pSourceBufer = AuSPtr<AuByteBuffer>(pWork, AuStaticCast<IOPipeWork>(pWork)->GetBuffer());
|
|
return pStack;
|
|
}
|
|
|
|
static AuSPtr<IProtocolStack> _NewProtocolStackFromStream(const ProtocolStackByBufferedAdhoc &data,
|
|
ProtocolStackExtraData ex)
|
|
{
|
|
auto pStack = AuMakeShared<ProtocolStack>();
|
|
if (!pStack)
|
|
{
|
|
SysPushErrorMem();
|
|
return {};
|
|
}
|
|
|
|
if (!data.uBufferedStreamLength)
|
|
{
|
|
SysPushErrorArg();
|
|
return {};
|
|
}
|
|
|
|
pStack->eType = EProtocolStackCreateType::eByBufferedAdhocStream;
|
|
|
|
auto pBuffer = AuMakeShared<AuByteBuffer>(data.uBufferedStreamLength, true);
|
|
if (!pBuffer)
|
|
{
|
|
SysPushErrorMem();
|
|
return {};
|
|
}
|
|
|
|
if (!pBuffer->IsValid())
|
|
{
|
|
return {};
|
|
}
|
|
|
|
if (ex.bKillPipeOnFirstRootLevelFalse)
|
|
{
|
|
pStack->bKillPipeOnFirstRootLevelFalse = ex.bKillPipeOnFirstRootLevelFalse.value();
|
|
}
|
|
|
|
pStack->bOwnsSource = true;
|
|
pStack->pSourceBufer = pBuffer;
|
|
pStack->pStreamReaderAsInputSource = data.pStreamReader;
|
|
pStack->uPreferredFragmentSize = data.uPreferredFragmentSize;
|
|
|
|
pStack->bStreamReaderFlagAdhocPump = ex.bStreamReaderFlagAdhocPump;
|
|
pStack->bStreamReaderFlagAdhocRead = ex.bStreamReaderFlagAdhocReadFromAdhoc;
|
|
|
|
return pStack;
|
|
}
|
|
|
|
AUKN_SYM AuSPtr<IProtocolStack> NewProtocolStackFromPipe(const AuSPtr<IIOPipeWork> &pWork,
|
|
bool bAutoTick,
|
|
bool bKillPipeOnFirstRootLevelFalse)
|
|
{
|
|
return _NewProtocolStackFromPipe(pWork, bAutoTick, bKillPipeOnFirstRootLevelFalse, {});
|
|
}
|
|
|
|
AUKN_SYM AuSPtr<IProtocolStack> NewProtocolStackFromDescription(const ProtocolStackDescription &description)
|
|
{
|
|
if (!EProtocolStackCreateTypeIsValid(description.type))
|
|
{
|
|
SysPushErrorArg();
|
|
return {};
|
|
}
|
|
|
|
switch (description.type)
|
|
{
|
|
case EProtocolStackCreateType::eByPipe:
|
|
return _NewProtocolStackFromPipe(description.byPipe.pWork, description.byPipe.bAutoTick, description.byPipe.bKillPipeOnFirstRootLevelFalse, description.extendedFlags);
|
|
case EProtocolStackCreateType::eByBuffered:
|
|
return _NewBufferedProtocolStack(description.byBuffered.uBufferedStreamLength, description.extendedFlags);
|
|
case EProtocolStackCreateType::eByBufferedAdhocStream:
|
|
return _NewProtocolStackFromStream(description.byBufferedStream, description.extendedFlags);
|
|
default:
|
|
SysUnreachable();
|
|
}
|
|
}
|
|
|
|
AUKN_SYM AuSPtr<IStreamReader> UtilityWrapStreamReaderWithInterceptors(AuSPtr<IStreamReader> pStreamReader,
|
|
AuList<AuTuple<bool /* bSingleFrame*/, AuUInt /*nextBufferSize*/, AuSPtr<IProtocolInterceptorEx>>> pairs,
|
|
AuUInt uBaseStreamLength,
|
|
AuUInt uPreferredFragmentSize,
|
|
AuSPtr<IProtocolStack> *pOptOutStack)
|
|
{
|
|
ProtocolStackDescription desc;
|
|
desc.type = EProtocolStackCreateType::eByBufferedAdhocStream;
|
|
desc.byBufferedStream.pStreamReader = pStreamReader;
|
|
desc.byBufferedStream.uBufferedStreamLength = uBaseStreamLength;
|
|
desc.byBufferedStream.uPreferredFragmentSize = uPreferredFragmentSize;
|
|
desc.extendedFlags.bStreamReaderFlagAdhocReadFromAdhoc = true;
|
|
AuSPtr<IProtocolStack> pStack;
|
|
|
|
if (!(pStack = NewProtocolStackFromDescription(desc)))
|
|
{
|
|
SysPushErrorNested();
|
|
return {};
|
|
}
|
|
|
|
for (AU_ITERATE_N(i, pairs.size()))
|
|
{
|
|
const auto &[bSingle, uNextBufferSize, pPointer] = pairs[i];
|
|
|
|
if (bSingle)
|
|
{
|
|
if (!pStack->AddSingleFrameProcessorEx(EProtocolWhere::eAppend, pPointer, uNextBufferSize))
|
|
{
|
|
SysPushErrorNested();
|
|
return {};
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (i == pairs.size() - 1)
|
|
{
|
|
if (!pStack->AddEndInterceptor(pPointer))
|
|
{
|
|
SysPushErrorNested();
|
|
return {};
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (!pStack->AddInterceptorEx(EProtocolWhere::eAppend, pPointer, uNextBufferSize))
|
|
{
|
|
SysPushErrorNested();
|
|
return {};
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (pOptOutStack)
|
|
{
|
|
*pOptOutStack = pStack;
|
|
}
|
|
|
|
return pStack->AsStreamReader();
|
|
}
|
|
} |