AuroraRuntime/Source/IO/Protocol/AuProtocolStack.cpp

777 lines
21 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuProtocolStack.cpp
Date: 2022-8-24
Author: Reece
***/
#include "Protocol.hpp"
#include "AuProtocolStack.hpp"
#include "AuProtocolPiece.hpp"
#include "IProtocolNext.hpp"
#include <Source/IO/AuIOPipeProcessor.hpp>
namespace Aurora::IO::Protocol
{
ProtocolStack::~ProtocolStack()
{
}
AuSPtr<IProtocolPiece> ProtocolStack::AppendInterceptor(const AuSPtr<IProtocolInterceptor> &pInterceptor,
AuUInt uOutputBufferSize)
{
return this->AddInterceptorWhere(false, pInterceptor, uOutputBufferSize);
}
AuSPtr<IProtocolPiece> ProtocolStack::AppendInterceptorEx(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx,
AuUInt uOutputBufferSize)
{
return this->AddInterceptorWhereEx(false, pInterceptorEx, uOutputBufferSize, false);
}
AuSPtr<IProtocolPiece> ProtocolStack::AppendSingleFrameProcessor(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx)
{
return this->AddInterceptorWhereEx(false, pInterceptorEx, 0, true);
}
AuSPtr<IProtocolPiece> ProtocolStack::AppendSingleFrameProcessorEx(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx,
AuUInt uOutputBufferSize)
{
return this->AddInterceptorWhereEx(false, pInterceptorEx, uOutputBufferSize, true);
}
AuSPtr<IProtocolPiece> ProtocolStack::PrependSingleFrameProcessor(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx)
{
return this->AddInterceptorWhereEx(true, pInterceptorEx, 0, true);
}
AuSPtr<IProtocolPiece> ProtocolStack::PrependSingleFrameProcessorEx(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx,
AuUInt uOutputBufferSize)
{
return this->AddInterceptorWhereEx(true, pInterceptorEx, uOutputBufferSize, true);
}
AuSPtr<IProtocolPiece> ProtocolStack::PrependInterceptor(const AuSPtr<IProtocolInterceptor> &pInterceptor,
AuUInt uOutputBufferSize)
{
return this->AddInterceptorWhere(true, pInterceptor, uOutputBufferSize);
}
AuSPtr<IProtocolPiece> ProtocolStack::PrependInterceptorEx(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx,
AuUInt uOutputBufferSize)
{
return this->AddInterceptorWhereEx(true, pInterceptorEx, uOutputBufferSize, false);
}
AuSPtr<IProtocolPiece> ProtocolStack::AddInterceptorWhere(bool bPrepend,
const AuSPtr<IProtocolInterceptor> &pInterceptor,
AuUInt uOutputBufferSize)
{
if (this->bWrittenEnd)
{
return {};
}
if (!uOutputBufferSize)
{
uOutputBufferSize = 64 * 1024;
}
auto pNew = AuMakeShared<ProtocolPiece>();
if (!pNew)
{
SysPushErrorNet("Out of memory");
return {};
}
pNew->outputBuffer = AuByteBuffer(uOutputBufferSize, true, false);
if (!pNew->outputBuffer.IsValid())
{
SysPushErrorNet("Out of memory");
return {};
}
// Circular ref
pNew->pOuputWriter = AuMakeShared<AuIO::Buffered::BlobWriter>(AuSPtr<Memory::ByteBuffer>(pNew, &pNew->outputBuffer));
struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis<StreamWrapper>
{
AuWPtr<ProtocolStack> pStack;
AuWPtr<IProtocolInterceptor> pInterceptor;
AuWPtr<ProtocolPiece> pParent;
EStreamError IsOpen() override
{
return EStreamError::eErrorNone;
}
EStreamError Write(const Memory::MemoryViewStreamRead &parameters) override
{
return pStack.lock()->DoTick(AuMakeShared<AuByteBuffer>(parameters), pParent.lock()) ?
EStreamError::eErrorNone :
EStreamError::eErrorStreamInterrupted;
}
void Close() override
{
if (auto pStack = AuTryLockMemoryType(this->pStack))
{
pStack->Terminate();
}
};
virtual void Flush() override
{
if (auto pStack = AuTryLockMemoryType(this->pStack))
{
pStack->DoTick();
}
}
virtual AuSPtr<Memory::ByteBuffer> GetOutputBuffer() override
{
return {};
}
virtual AuSPtr<IStreamWriter> GetStreamWriter() override
{
return AuSharedFromThis();
}
};
auto pWrapper = AuMakeShared<StreamWrapper>();
if (!pWrapper)
{
SysPushErrorMemory();
return {};
}
pWrapper->pInterceptor = pInterceptor;
pWrapper->pStack = AuSharedFromThis();
pWrapper->pParent = pNew;
pNew->pWriteInteface = pWrapper;
pNew->pParent = this;
pNew->pInterceptor = pInterceptor;
if (bPrepend)
{
if (this->pBottomPiece)
{
pNew->pNext = this->pBottomPiece;
}
this->pBottomPiece = pNew;
}
else
{
if (this->pTopPiece)
{
this->pTopPiece->pNext = pNew;
}
this->pTopPiece = pNew;
if (!this->pBottomPiece)
{
this->pBottomPiece = pNew;
}
}
return pNew;
}
AuSPtr<IProtocolPiece> ProtocolStack::AddInterceptorWhereEx(bool bPrepend,
const AuSPtr<IProtocolInterceptorEx> &pInterceptor,
AuUInt uOutputBufferSize,
bool bMultipleTick)
{
if (this->bWrittenEnd)
{
return {};
}
if (!this->pSourceBufer)
{
return {};
}
if (!uOutputBufferSize)
{
uOutputBufferSize = 64 * 1024;
}
auto pNew = AuMakeShared<ProtocolPiece>();
if (!pNew)
{
SysPushErrorNet("Out of memory");
return {};
}
pNew->outputBuffer = AuByteBuffer(uOutputBufferSize, true, false);
if (!pNew->outputBuffer.IsValid())
{
SysPushErrorNet("Out of memory");
return {};
}
// Circular ref
pNew->pOuputWriter = AuMakeShared<AuIO::Buffered::BlobWriter>(AuSPtr<Memory::ByteBuffer>(pNew, &pNew->outputBuffer));
struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis<StreamWrapper>
{
AuWPtr<ProtocolStack> pStack;
AuWPtr<IProtocolInterceptorEx> pInterceptor;
AuWPtr<ProtocolPiece> pParent;
EStreamError IsOpen() override
{
return EStreamError::eErrorNone;
}
EStreamError Write(const Memory::MemoryViewStreamRead &parameters) override
{
return pStack.lock()->DoTick(AuMakeShared<AuByteBuffer>(parameters), pParent.lock()) ?
EStreamError::eErrorNone :
EStreamError::eErrorStreamInterrupted;
}
void Close() override
{
if (auto pStack = AuTryLockMemoryType(this->pStack))
{
pStack->Terminate();
}
}
virtual void Flush() override
{
if (auto pStack = AuTryLockMemoryType(this->pStack))
{
pStack->DoTick();
}
}
virtual AuSPtr<Memory::ByteBuffer> GetOutputBuffer() override
{
return {};
}
virtual AuSPtr<IStreamWriter> GetStreamWriter() override
{
return AuSharedFromThis();
}
};
auto pWrapper = AuMakeShared<StreamWrapper>();
if (!pWrapper)
{
SysPushErrorMemory();
return {};
}
pWrapper->pInterceptor = pInterceptor;
pWrapper->pStack = AuSharedFromThis();
pWrapper->pParent = pNew;
pNew->pWriteInteface = pWrapper;
pNew->pInterceptorEx = pInterceptor;
pNew->pParent = this;
pNew->bMultipleTick = bMultipleTick;
if (bPrepend)
{
if (this->pBottomPiece)
{
pNew->pNext = this->pBottomPiece;
}
this->pBottomPiece = pNew;
}
else
{
if (this->pTopPiece)
{
this->pTopPiece->pNext = pNew;
}
this->pTopPiece = pNew;
if (!this->pBottomPiece)
{
this->pBottomPiece = pNew;
}
}
return pNew;
}
AuSPtr<IProtocolPiece> ProtocolStack::AddEndInterceptor(const AuSPtr<IProtocolInterceptorEx> &pInterceptor)
{
if (this->bWrittenEnd)
{
return {};
}
if (!this->pSourceBufer)
{
return {};
}
auto pNew = AuMakeShared<ProtocolPiece>();
if (!pNew)
{
SysPushErrorNet("Out of memory");
return {};
}
struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis<StreamWrapper>
{
AuWPtr<ProtocolStack> pStack;
AuWPtr<IProtocolInterceptorEx> pInterceptor;
AuWPtr<ProtocolPiece> pParent;
EStreamError IsOpen() override
{
return EStreamError::eErrorNone;
}
EStreamError Write(const Memory::MemoryViewStreamRead &parameters) override
{
return pStack.lock()->DoTick(AuMakeShared<AuByteBuffer>(parameters), pParent.lock()) ?
EStreamError::eErrorNone :
EStreamError::eErrorStreamInterrupted;
}
void Close() override
{
if (auto pStack = AuTryLockMemoryType(this->pStack))
{
pStack->Terminate();
}
}
virtual void Flush() override
{
if (auto pStack = AuTryLockMemoryType(this->pStack))
{
pStack->DoTick();
}
}
virtual AuSPtr<Memory::ByteBuffer> GetOutputBuffer() override
{
return {};
}
virtual AuSPtr<IStreamWriter> GetStreamWriter() override
{
return AuSharedFromThis();
}
};
auto pWrapper = AuMakeShared<StreamWrapper>();
if (!pWrapper)
{
SysPushErrorMemory();
return {};
}
pWrapper->pInterceptor = pInterceptor;
pWrapper->pStack = AuSharedFromThis();
pWrapper->pParent = pNew;
pNew->pWriteInteface = pWrapper;
pNew->pInterceptorEx = pInterceptor;
pNew->pParent = this;
if (this->pTopPiece)
{
this->pTopPiece->pNext = pNew;
}
this->pTopPiece = pNew;
if (!this->pBottomPiece)
{
this->pBottomPiece = pNew;
}
this->bWrittenEnd = true;
return pNew;
}
void ProtocolStack::Destroy()
{
if (this->bOwnsSource)
{
if (this->pSourceBufer)
{
this->pSourceBufer->Reset();
this->pSourceBufer.reset();
}
}
auto pItr = this->pBottomPiece;
while (pItr)
{
auto pCur = pItr;
pItr = pCur->pNext;
pCur->pNext.reset();
pCur->pInterceptor.reset();
pCur->pInterceptorEx.reset();
pCur->pOuputWriter.reset();
pCur->pWriteInteface.reset();
}
this->pBottomPiece.reset();
this->pTopPiece.reset();
}
AuSPtr<IStreamWriter> ProtocolStack::AsStreamWriter()
{
if (!this->bOwnsSource)
{
return {};
}
if (this->pStreamWriterCache)
{
return this->pStreamWriterCache;
}
struct A : AuIO::Buffered::BlobWriter
{
AuWPtr<ProtocolStack> wpStack;
A(AuSPtr<AuByteBuffer> pBuffer,
AuWPtr<ProtocolStack> wpStack) :
AuIO::Buffered::BlobWriter(pBuffer),
wpStack(wpStack)
{
}
virtual void Close() override
{
if (auto pStack = AuTryLockMemoryType(wpStack))
{
pStack->Terminate();
}
}
virtual void Flush() override
{
if (auto pStack = AuTryLockMemoryType(wpStack))
{
pStack->DoTick();
}
}
};
return this->pStreamWriterCache = AuMakeShared<A>(this->pSourceBufer, this->SharedFromThis());
}
AuSPtr<Memory::ByteBuffer> ProtocolStack::AsWritableByteBuffer()
{
if (!this->bOwnsSource)
{
return {};
}
return this->pSourceBufer;
}
AuSPtr<IStreamReader> ProtocolStack::AsStreamReader()
{
auto pBuffer = AsReadableByteBuffer();
if (!pBuffer)
{
return {};
}
if (this->pStreamReaderCache)
{
return this->pStreamReaderCache;
}
struct A : AuIO::Buffered::BlobReader
{
AuWPtr<ProtocolStack> wpStack;
A(AuSPtr<AuByteBuffer> pBuffer,
AuWPtr<ProtocolStack> wpStack) :
AuIO::Buffered::BlobReader(pBuffer),
wpStack(wpStack)
{
}
virtual void Close() override
{
if (auto pStack = AuTryLockMemoryType(wpStack))
{
pStack->Terminate();
}
}
};
return this->pStreamReaderCache = AuMakeShared<A>(pBuffer, this->SharedFromThis());
}
AuSPtr<Memory::ByteBuffer> ProtocolStack::AsReadableByteBuffer()
{
if (!this->pTopPiece)
{
return this->pSourceBufer;
}
if (this->pTopPiece->outputBuffer.IsEmpty())
{
return {};
}
return AuSPtr<AuByteBuffer>(this->pTopPiece, &this->pTopPiece->outputBuffer);
}
void ProtocolStack::Terminate()
{
if (auto pStack = AuTryLockMemoryType(this->pPipeWork))
{
(void)pStack->End();
}
}
bool ProtocolStack::DoTick()
{
if (!this->pSourceBufer)
{
return false;
}
if (!this->pBottomPiece)
{
return false;
}
if (!this->DoTick(this->pSourceBufer, {}))
{
if (this->bKillPipeOnFirstRootLevelFalse)
{
if (auto pStack = AuTryLockMemoryType(this->pPipeWork))
{
pStack->TerminateOnThread(true);
}
}
return false;
}
return true;
}
bool ProtocolStack::DoTick(const AuSPtr<AuByteBuffer> &pRead, const AuSPtr<ProtocolPiece> &pPiece)
{
auto pCurrent = pPiece ? pPiece : this->pBottomPiece;
if (pCurrent->pInterceptorEx)
{
bool bTryAgain {};
bool bTryAgainAtleastOnce {};
do
{
auto pNextStream = ((pPiece == this->pTopPiece) && (this->pDrainBuffer)) ?
(this->pDrainBuffer) :
(!pCurrent->outputBuffer.IsEmpty() ? AuSPtr<AuByteBuffer>(pCurrent, &pCurrent->outputBuffer) : AuSPtr<AuByteBuffer> {});
auto pOldHead = pNextStream ? pNextStream->readPtr : nullptr;
if (pRead)
{
if (pRead->flagReadError)
{
return false;
}
}
if (!pCurrent->pInterceptorEx->OnDataAvailable(pRead, pNextStream, pCurrent))
{
if (pNextStream)
{
pNextStream->readPtr = pOldHead;
}
return bTryAgainAtleastOnce;
}
if (!pNextStream)
{
return true;
}
if (auto pNext = pCurrent->pNext)
{
auto pOldHead = pNextStream->readPtr;
bool bDontCareAboutSubTicks = this->DoTick(pNextStream, pNext);
if (!bDontCareAboutSubTicks)
{
pNextStream->readPtr = pOldHead;
}
}
if (pCurrent->bMultipleTick)
{
bTryAgain = pRead->RemainingBytes();
bTryAgainAtleastOnce |= bTryAgain;
}
if (pCurrent->outputBuffer)
{
if (pCurrent->outputBuffer.flagWriteError)
{
return false;
}
}
}
while (bTryAgain);
return true;
}
else
{
AuUInt8 *pBase {};
AuUInt uCount {};
if (pRead->writePtr >= pRead->readPtr)
{
uCount = pRead->writePtr - pRead->readPtr;
pBase = pRead->readPtr;
}
else if (pRead->flagCircular)
{
uCount = (pRead->base + pRead->length) - pRead->readPtr;
pBase = pRead->readPtr;
}
auto pNextStream = pCurrent->ToNextWriter();
if (!pCurrent->pInterceptor->OnDataAvailable({ pBase, uCount }, pNextStream))
{
return false;
}
return true;
}
}
AuList<AuSPtr<IProtocolPiece>> ProtocolStack::GetArrayOfInterceptors()
{
AuList<AuSPtr<IProtocolPiece>> list;
auto pItr = this->pBottomPiece;
while (pItr)
{
auto pCur = pItr;
pItr = pCur->pNext;
list.push_back(pCur);
}
return list;
}
AuSPtr<IProtocolPiece> ProtocolStack::GetInterceptorAtIndex(AuUInt32 uIndex)
{
AuUInt32 uCounter {};
auto pItr = this->pBottomPiece;
while (pItr)
{
auto pCur = pItr;
pItr = pCur->pNext;
if (uIndex == uCounter++)
{
return pCur;
}
}
return {};
}
AUKN_SYM AuSPtr<IProtocolStack> NewBufferedProtocolStack(AuUInt uLength)
{
if (!uLength)
{
SysPushErrorArg();
return {};
}
auto pStack = AuMakeShared<ProtocolStack>();
if (!pStack)
{
SysPushErrorMem();
return {};
}
auto pBuffer = AuMakeShared<AuByteBuffer>(uLength, true);
if (!pBuffer)
{
SysPushErrorMem();
return {};
}
if (!pBuffer->IsValid())
{
return {};
}
pStack->bOwnsSource = true;
pStack->pSourceBufer = pBuffer;
return pStack;
}
AUKN_SYM AuSPtr<IProtocolStack> NewProtocolStackFromPipe(const AuSPtr<IIOPipeWork> &pWork, bool bAutoTick, bool bKillPipeOnFirstRootLevelFalse)
{
if (!pWork)
{
SysPushErrorArg();
return {};
}
auto pStack = AuMakeShared<ProtocolStack>();
if (!pStack)
{
SysPushErrorMem();
return {};
}
auto pWorkEx = AuStaticCast<IOPipeWork>(pWork);
pStack->pPipeWork = pWorkEx;
pStack->bKillPipeOnFirstRootLevelFalse = bKillPipeOnFirstRootLevelFalse;
if (bAutoTick)
{
if (pWorkEx->pProtocolStack)
{
SysPushErrorGeneric("NewProtocolStackFromPipe called twice. wont recv DoTick");
return {};
}
else
{
pWorkEx->pProtocolStack = pStack;
pStack->pSourceBufer = AuUnsafeRaiiToShared(pWorkEx->GetBuffer());
return pStack;
}
}
pStack->pSourceBufer = AuSPtr<AuByteBuffer>(pWork, AuStaticCast<IOPipeWork>(pWork)->GetBuffer());
return pStack;
}
}