/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuProtocolStack.cpp Date: 2022-8-24 Author: Reece ***/ #include "Protocol.hpp" #include "AuProtocolStack.hpp" #include "AuProtocolPiece.hpp" #include "IProtocolNext.hpp" #include namespace Aurora::IO::Protocol { ProtocolStack::~ProtocolStack() { } AuSPtr ProtocolStack::AppendInterceptor(const AuSPtr &pInterceptor, AuUInt uOutputBufferSize) { return this->AddInterceptorWhere(false, pInterceptor, uOutputBufferSize); } AuSPtr ProtocolStack::AppendInterceptorEx(const AuSPtr &pInterceptorEx, AuUInt uOutputBufferSize) { return this->AddInterceptorWhereEx(false, pInterceptorEx, uOutputBufferSize, false); } AuSPtr ProtocolStack::AppendSingleFrameProcessor(const AuSPtr &pInterceptorEx) { return this->AddInterceptorWhereEx(false, pInterceptorEx, 0, true); } AuSPtr ProtocolStack::AppendSingleFrameProcessorEx(const AuSPtr &pInterceptorEx, AuUInt uOutputBufferSize) { return this->AddInterceptorWhereEx(false, pInterceptorEx, uOutputBufferSize, true); } AuSPtr ProtocolStack::PrependSingleFrameProcessor(const AuSPtr &pInterceptorEx) { return this->AddInterceptorWhereEx(true, pInterceptorEx, 0, true); } AuSPtr ProtocolStack::PrependSingleFrameProcessorEx(const AuSPtr &pInterceptorEx, AuUInt uOutputBufferSize) { return this->AddInterceptorWhereEx(true, pInterceptorEx, uOutputBufferSize, true); } AuSPtr ProtocolStack::PrependInterceptor(const AuSPtr &pInterceptor, AuUInt uOutputBufferSize) { return this->AddInterceptorWhere(true, pInterceptor, uOutputBufferSize); } AuSPtr ProtocolStack::PrependInterceptorEx(const AuSPtr &pInterceptorEx, AuUInt uOutputBufferSize) { return this->AddInterceptorWhereEx(true, pInterceptorEx, uOutputBufferSize, false); } AuSPtr ProtocolStack::AddInterceptorWhere(bool bPrepend, const AuSPtr &pInterceptor, AuUInt uOutputBufferSize) { if (this->bWrittenEnd) { return {}; } if (!uOutputBufferSize) { uOutputBufferSize = 64 * 1024; } auto pNew = AuMakeShared(); if (!pNew) { SysPushErrorNet("Out of memory"); return {}; } pNew->outputBuffer = AuByteBuffer(uOutputBufferSize, true, false); if (!pNew->outputBuffer.IsValid()) { SysPushErrorNet("Out of memory"); return {}; } // Circular ref pNew->pOuputWriter = AuMakeShared(AuSPtr(pNew, &pNew->outputBuffer)); struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis { AuWPtr pStack; AuWPtr pInterceptor; AuWPtr pParent; EStreamError IsOpen() override { return EStreamError::eErrorNone; } EStreamError Write(const Memory::MemoryViewStreamRead ¶meters) override { return pStack.lock()->DoTick(AuMakeShared(parameters), pParent.lock()) ? EStreamError::eErrorNone : EStreamError::eErrorStreamInterrupted; } void Close() override { if (auto pStack = AuTryLockMemoryType(this->pStack)) { pStack->Terminate(); } }; void Flush() override { }; virtual AuSPtr GetOutputBuffer() override { return {}; } virtual AuSPtr GetStreamWriter() override { return AuSharedFromThis(); } }; auto pWrapper = AuMakeShared(); if (!pWrapper) { SysPushErrorMemory(); return {}; } pWrapper->pInterceptor = pInterceptor; pWrapper->pStack = AuSharedFromThis(); pWrapper->pParent = pNew; pNew->pWriteInteface = pWrapper; pNew->pParent = this; pNew->pInterceptor = pInterceptor; if (bPrepend) { if (this->pBottomPiece) { pNew->pNext = this->pBottomPiece; } this->pBottomPiece = pNew; } else { if (this->pTopPiece) { this->pTopPiece->pNext = pNew; } this->pTopPiece = pNew; if (!this->pBottomPiece) { this->pBottomPiece = pNew; } } return pNew; } AuSPtr ProtocolStack::AddInterceptorWhereEx(bool bPrepend, const AuSPtr &pInterceptor, AuUInt uOutputBufferSize, bool bMultipleTick) { if (this->bWrittenEnd) { return {}; } if (!this->pSourceBufer) { return {}; } if (!uOutputBufferSize) { uOutputBufferSize = 64 * 1024; } auto pNew = AuMakeShared(); if (!pNew) { SysPushErrorNet("Out of memory"); return {}; } pNew->outputBuffer = AuByteBuffer(uOutputBufferSize, true, false); if (!pNew->outputBuffer.IsValid()) { SysPushErrorNet("Out of memory"); return {}; } // Circular ref pNew->pOuputWriter = AuMakeShared(AuSPtr(pNew, &pNew->outputBuffer)); struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis { AuWPtr pStack; AuWPtr pInterceptor; AuWPtr pParent; EStreamError IsOpen() override { return EStreamError::eErrorNone; } EStreamError Write(const Memory::MemoryViewStreamRead ¶meters) override { return pStack.lock()->DoTick(AuMakeShared(parameters), pParent.lock()) ? EStreamError::eErrorNone : EStreamError::eErrorStreamInterrupted; } void Close() override { if (auto pStack = AuTryLockMemoryType(this->pStack)) { pStack->Terminate(); } }; void Flush() override { }; virtual AuSPtr GetOutputBuffer() override { return {}; } virtual AuSPtr GetStreamWriter() override { return AuSharedFromThis(); } }; auto pWrapper = AuMakeShared(); if (!pWrapper) { SysPushErrorMemory(); return {}; } pWrapper->pInterceptor = pInterceptor; pWrapper->pStack = AuSharedFromThis(); pWrapper->pParent = pNew; pNew->pWriteInteface = pWrapper; pNew->pInterceptorEx = pInterceptor; pNew->pParent = this; pNew->bMultipleTick = bMultipleTick; if (bPrepend) { if (this->pBottomPiece) { pNew->pNext = this->pBottomPiece; } this->pBottomPiece = pNew; } else { if (this->pTopPiece) { this->pTopPiece->pNext = pNew; } this->pTopPiece = pNew; if (!this->pBottomPiece) { this->pBottomPiece = pNew; } } return pNew; } AuSPtr ProtocolStack::AddEndInterceptor(const AuSPtr &pInterceptor) { if (this->bWrittenEnd) { return {}; } if (!this->pSourceBufer) { return {}; } auto pNew = AuMakeShared(); if (!pNew) { SysPushErrorNet("Out of memory"); return {}; } struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis { AuWPtr pStack; AuWPtr pInterceptor; AuWPtr pParent; EStreamError IsOpen() override { return EStreamError::eErrorNone; } EStreamError Write(const Memory::MemoryViewStreamRead ¶meters) override { return pStack.lock()->DoTick(AuMakeShared(parameters), pParent.lock()) ? EStreamError::eErrorNone : EStreamError::eErrorStreamInterrupted; } void Close() override { if (auto pStack = AuTryLockMemoryType(this->pStack)) { pStack->Terminate(); } }; void Flush() override { }; virtual AuSPtr GetOutputBuffer() override { return {}; } virtual AuSPtr GetStreamWriter() override { return AuSharedFromThis(); } }; auto pWrapper = AuMakeShared(); if (!pWrapper) { SysPushErrorMemory(); return {}; } pWrapper->pInterceptor = pInterceptor; pWrapper->pStack = AuSharedFromThis(); pWrapper->pParent = pNew; pNew->pWriteInteface = pWrapper; pNew->pInterceptorEx = pInterceptor; pNew->pParent = this; if (this->pTopPiece) { this->pTopPiece->pNext = pNew; } this->pTopPiece = pNew; if (!this->pBottomPiece) { this->pBottomPiece = pNew; } this->bWrittenEnd = true; return pNew; } void ProtocolStack::Destroy() { if (this->bOwnsSource) { if (this->pSourceBufer) { this->pSourceBufer->Reset(); this->pSourceBufer.reset(); } } auto pItr = this->pBottomPiece; while (pItr) { auto pCur = pItr; pItr = pCur->pNext; pCur->pNext.reset(); pCur->pInterceptor.reset(); pCur->pInterceptorEx.reset(); pCur->pOuputWriter.reset(); pCur->pWriteInteface.reset(); } this->pBottomPiece.reset(); this->pTopPiece.reset(); } AuSPtr ProtocolStack::AsStreamWriter() { if (!this->bOwnsSource) { return {}; } if (this->pStreamWriterCache) { return this->pStreamWriterCache; } struct A : AuIO::Buffered::BlobWriter { AuWPtr wpStack; A(AuSPtr pBuffer, AuWPtr wpStack) : AuIO::Buffered::BlobWriter(pBuffer), wpStack(wpStack) { } virtual void Close() override { if (auto pStack = AuTryLockMemoryType(wpStack)) { pStack->Terminate(); } } }; return this->pStreamWriterCache = AuMakeShared(this->pSourceBufer, this->SharedFromThis()); } AuSPtr ProtocolStack::AsWritableByteBuffer() { if (!this->bOwnsSource) { return {}; } return this->pSourceBufer; } AuSPtr ProtocolStack::AsStreamReader() { auto pBuffer = AsReadableByteBuffer(); if (!pBuffer) { return {}; } if (this->pStreamReaderCache) { return this->pStreamReaderCache; } struct A : AuIO::Buffered::BlobReader { AuWPtr wpStack; A(AuSPtr pBuffer, AuWPtr wpStack) : AuIO::Buffered::BlobReader(pBuffer), wpStack(wpStack) { } virtual void Close() override { if (auto pStack = AuTryLockMemoryType(wpStack)) { pStack->Terminate(); } } }; return this->pStreamReaderCache = AuMakeShared(pBuffer, this->SharedFromThis()); } AuSPtr ProtocolStack::AsReadableByteBuffer() { if (!this->pTopPiece) { return this->pSourceBufer; } if (this->pTopPiece->outputBuffer.IsEmpty()) { return {}; } return AuSPtr(this->pTopPiece, &this->pTopPiece->outputBuffer); } void ProtocolStack::Terminate() { if (auto pStack = AuTryLockMemoryType(this->pPipeWork)) { (void)pStack->End(); } } bool ProtocolStack::DoTick() { if (!this->pSourceBufer) { return false; } if (!this->pBottomPiece) { return false; } if (!this->DoTick(this->pSourceBufer, {})) { if (this->bKillPipeOnFirstRootLevelFalse) { if (auto pStack = AuTryLockMemoryType(this->pPipeWork)) { pStack->TerminateOnThread(true); } } return false; } return true; } bool ProtocolStack::DoTick(const AuSPtr &pRead, const AuSPtr &pPiece) { auto pCurrent = pPiece ? pPiece : this->pBottomPiece; if (pCurrent->pInterceptorEx) { bool bTryAgain {}; bool bTryAgainAtleastOnce {}; do { auto pNextStream = ((pPiece == this->pTopPiece) && (this->pDrainBuffer)) ? (this->pDrainBuffer) : (!pCurrent->outputBuffer.IsEmpty() ? AuSPtr(pCurrent, &pCurrent->outputBuffer) : AuSPtr {}); auto pOldHead = pNextStream ? pNextStream->readPtr : nullptr; if (pRead) { if (pRead->flagReadError) { return false; } } if (!pCurrent->pInterceptorEx->OnDataAvailable(pRead, pNextStream)) { if (pNextStream) { pNextStream->readPtr = pOldHead; } return bTryAgainAtleastOnce; } if (!pNextStream) { return true; } if (auto pNext = pCurrent->pNext) { auto pOldHead = pNextStream->readPtr; bool bDontCareAboutSubTicks = this->DoTick(pNextStream, pNext); if (!bDontCareAboutSubTicks) { pNextStream->readPtr = pOldHead; } } if (pCurrent->bMultipleTick) { bTryAgain = pRead->RemainingBytes(); bTryAgainAtleastOnce |= bTryAgain; } if (pCurrent->outputBuffer) { if (pCurrent->outputBuffer.flagWriteError) { return false; } } } while (bTryAgain); return true; } else { AuUInt8 *pBase {}; AuUInt uCount {}; if (pRead->writePtr >= pRead->readPtr) { uCount = pRead->writePtr - pRead->readPtr; pBase = pRead->readPtr; } else if (pRead->flagCircular) { uCount = (pRead->base + pRead->length) - pRead->readPtr; pBase = pRead->readPtr; } auto pNextStream = pCurrent->ToNextWriter(); if (!pCurrent->pInterceptor->OnDataAvailable({ pBase, uCount }, pNextStream)) { return false; } return true; } } AuList> ProtocolStack::GetArrayOfInterceptors() { AuList> list; auto pItr = this->pBottomPiece; while (pItr) { auto pCur = pItr; pItr = pCur->pNext; list.push_back(pCur); } return list; } AuSPtr ProtocolStack::GetInterceptorAtIndex(AuUInt32 uIndex) { AuUInt32 uCounter {}; auto pItr = this->pBottomPiece; while (pItr) { auto pCur = pItr; pItr = pCur->pNext; if (uIndex == uCounter++) { return pCur; } } return {}; } AUKN_SYM AuSPtr NewBufferedProtocolStack(AuUInt uLength) { if (!uLength) { SysPushErrorArg(); return {}; } auto pStack = AuMakeShared(); if (!pStack) { SysPushErrorMem(); return {}; } auto pBuffer = AuMakeShared(uLength, true); if (!pBuffer) { SysPushErrorMem(); return {}; } if (!pBuffer->IsValid()) { return {}; } pStack->bOwnsSource = true; pStack->pSourceBufer = pBuffer; return pStack; } AUKN_SYM AuSPtr NewProtocolStackFromPipe(const AuSPtr &pWork, bool bAutoTick, bool bKillPipeOnFirstRootLevelFalse) { if (!pWork) { SysPushErrorArg(); return {}; } auto pStack = AuMakeShared(); if (!pStack) { SysPushErrorMem(); return {}; } auto pWorkEx = AuStaticCast(pWork); pStack->pPipeWork = pWorkEx; pStack->bKillPipeOnFirstRootLevelFalse = bKillPipeOnFirstRootLevelFalse; if (bAutoTick) { if (pWorkEx->pProtocolStack) { SysPushErrorGeneric("NewProtocolStackFromPipe called twice. wont recv DoTick"); return {}; } else { pWorkEx->pProtocolStack = pStack; pStack->pSourceBufer = AuUnsafeRaiiToShared(pWorkEx->GetBuffer()); return pStack; } } pStack->pSourceBufer = AuSPtr(pWork, AuStaticCast(pWork)->GetBuffer()); return pStack; } }