/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuProtocolStack.cpp Date: 2022-8-24 Author: Reece ***/ #include "Protocol.hpp" #include "AuProtocolStack.hpp" #include "AuProtocolPiece.hpp" #include "IProtocolNext.hpp" #include namespace Aurora::IO::Protocol { ProtocolStack::ProtocolStack() : readView(this), writeView(this) { } ProtocolStack::~ProtocolStack() { } AuSPtr ProtocolStack::AppendInterceptor(const AuSPtr &pInterceptor, AuUInt uOutputBufferSize) { return this->AddInterceptorWhere(false, pInterceptor, uOutputBufferSize); } AuSPtr ProtocolStack::AppendInterceptorEx(const AuSPtr &pInterceptorEx, AuUInt uOutputBufferSize) { return this->AddInterceptorWhereEx(false, pInterceptorEx, uOutputBufferSize, false); } AuSPtr ProtocolStack::AppendSingleFrameProcessor(const AuSPtr &pInterceptorEx) { return this->AddInterceptorWhereEx(false, pInterceptorEx, 0, true); } AuSPtr ProtocolStack::AppendSingleFrameProcessorEx(const AuSPtr &pInterceptorEx, AuUInt uOutputBufferSize) { return this->AddInterceptorWhereEx(false, pInterceptorEx, uOutputBufferSize, true); } AuSPtr ProtocolStack::PrependSingleFrameProcessor(const AuSPtr &pInterceptorEx) { return this->AddInterceptorWhereEx(true, pInterceptorEx, 0, true); } AuSPtr ProtocolStack::PrependSingleFrameProcessorEx(const AuSPtr &pInterceptorEx, AuUInt uOutputBufferSize) { return this->AddInterceptorWhereEx(true, pInterceptorEx, uOutputBufferSize, true); } AuSPtr ProtocolStack::AppendInterceptorDynamicBuffer(const AuSPtr &pInterceptorEx, AuUInt uOutputBufferSize, AuUInt uOutputBufferSizeMax) { return this->AddInterceptorWhereEx(false, pInterceptorEx, uOutputBufferSize, false, uOutputBufferSizeMax); } AuSPtr ProtocolStack::PrependInterceptorDynamicBuffer(const AuSPtr &pInterceptorEx, AuUInt uOutputBufferSize, AuUInt uOutputBufferSizeMax) { return this->AddInterceptorWhereEx(true, pInterceptorEx, uOutputBufferSize, false, uOutputBufferSizeMax); } AuSPtr ProtocolStack::PrependSingleFrameProcessorDynamicBuffer(const AuSPtr &pInterceptorEx, AuUInt uOutputBufferSize, AuUInt uOutputBufferSizeMax) { return this->AddInterceptorWhereEx(true, pInterceptorEx, uOutputBufferSize, true, uOutputBufferSizeMax); } AuSPtr ProtocolStack::AppendSingleFrameProcessorDynamicBuffer(const AuSPtr &pInterceptorEx, AuUInt uOutputBufferSize, AuUInt uOutputBufferSizeMax) { return this->AddInterceptorWhereEx(false, pInterceptorEx, uOutputBufferSize, true, uOutputBufferSizeMax); } AuSPtr ProtocolStack::PrependInterceptor(const AuSPtr &pInterceptor, AuUInt uOutputBufferSize) { return this->AddInterceptorWhere(true, pInterceptor, uOutputBufferSize); } AuSPtr ProtocolStack::PrependInterceptorEx(const AuSPtr &pInterceptorEx, AuUInt uOutputBufferSize) { return this->AddInterceptorWhereEx(true, pInterceptorEx, uOutputBufferSize, false); } AuSPtr ProtocolStack::AddInterceptorWhere(bool bPrepend, const AuSPtr &pInterceptor, AuUInt uOutputBufferSize) { AU_LOCK_GUARD(this->mutex); if (this->bWrittenEnd) { return {}; } if (!uOutputBufferSize) { uOutputBufferSize = gRuntimeConfig.ioConfig.uProtocolStackDefaultBufferSize; } auto pNew = AuMakeShared(); if (!pNew) { SysPushErrorNet("Out of memory"); return {}; } pNew->outputBuffer = AuByteBuffer(uOutputBufferSize, true, false); if (!pNew->outputBuffer.IsValid()) { SysPushErrorNet("Out of memory"); return {}; } pNew->outputBuffer.flagNoFree = true; // Circular ref pNew->pOuputWriter = AuMakeShared(AuSPtr(pNew, &pNew->outputBuffer)); struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis { AuWPtr pStack; AuWPtr pInterceptor; AuWPtr pParent; EStreamError IsOpen() override { return EStreamError::eErrorNone; } EStreamError Write(const Memory::MemoryViewStreamRead ¶meters) override { auto pCopy = AuMakeShared(parameters); if (!pCopy || !(*pCopy.get())) { return EStreamError::eErrorOutOfMemory; } return pStack.lock()->DoTick(pCopy, pParent.lock()) ? EStreamError::eErrorNone : EStreamError::eErrorStreamInterrupted; } void Close() override { if (auto pStack = AuTryLockMemoryType(this->pStack)) { pStack->Terminate(); } }; virtual void Flush() override { if (auto pStack = AuTryLockMemoryType(this->pStack)) { pStack->DoTick(); } } virtual AuSPtr GetOutputBuffer() override { return {}; } virtual AuSPtr GetStreamWriter() override { return AuSharedFromThis(); } }; auto pWrapper = AuMakeShared(); if (!pWrapper) { SysPushErrorMemory(); return {}; } pWrapper->pInterceptor = pInterceptor; pWrapper->pStack = AuSharedFromThis(); pWrapper->pParent = pNew; pNew->pWriteInteface = pWrapper; pNew->pParent = this; pNew->pInterceptor = pInterceptor; if (bPrepend) { if (this->pBottomPiece) { pNew->pNext = this->pBottomPiece; } this->pBottomPiece = pNew; } else { if (this->pTopPiece) { this->pTopPiece->pNext = pNew; } this->pTopPiece = pNew; if (!this->pBottomPiece) { this->pBottomPiece = pNew; } } this->DiscardAllocCaches(); return pNew; } AuSPtr ProtocolStack::AddInterceptorWhereEx(bool bPrepend, const AuSPtr &pInterceptor, AuUInt uOutputBufferSize, bool bMultipleTick, AuOptional uMax) { AU_LOCK_GUARD(this->mutex); if (this->bWrittenEnd) { return {}; } if (!this->pSourceBufer) { return {}; } if (!uOutputBufferSize) { uOutputBufferSize = gRuntimeConfig.ioConfig.uProtocolStackDefaultBufferSize; } auto pNew = AuMakeShared(); if (!pNew) { SysPushErrorNet("Out of memory"); return {}; } pNew->outputBuffer = AuByteBuffer(uOutputBufferSize, true, false); if (!pNew->outputBuffer.IsValid()) { SysPushErrorNet("Out of memory"); return {}; } if (uMax) { pNew->uMax = uMax; pNew->outputBuffer.flagAlwaysExpandable = pNew->outputBuffer.flagExpandable = 1; } else { pNew->outputBuffer.flagNoFree = true; } pNew->uStartingSize = uOutputBufferSize; // Circular ref pNew->pOuputWriter = AuMakeShared(AuSPtr(pNew, &pNew->outputBuffer)); struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis { AuWPtr pStack; AuWPtr pInterceptor; AuWPtr pParent; EStreamError IsOpen() override { return EStreamError::eErrorNone; } EStreamError Write(const Memory::MemoryViewStreamRead ¶meters) override { auto pCopy = AuMakeShared(parameters); if (!pCopy || !(*pCopy.get())) { return EStreamError::eErrorOutOfMemory; } return pStack.lock()->DoTick(pCopy, pParent.lock()) ? EStreamError::eErrorNone : EStreamError::eErrorStreamInterrupted; } void Close() override { if (auto pStack = AuTryLockMemoryType(this->pStack)) { pStack->Terminate(); } } virtual void Flush() override { if (auto pStack = AuTryLockMemoryType(this->pStack)) { pStack->DoTick(); } } virtual AuSPtr GetOutputBuffer() override { return {}; } virtual AuSPtr GetStreamWriter() override { return AuSharedFromThis(); } }; auto pWrapper = AuMakeShared(); if (!pWrapper) { SysPushErrorMemory(); return {}; } pWrapper->pInterceptor = pInterceptor; pWrapper->pStack = AuSharedFromThis(); pWrapper->pParent = pNew; pNew->pWriteInteface = pWrapper; pNew->pInterceptorEx = pInterceptor; pNew->pParent = this; pNew->bMultipleTick = bMultipleTick; if (bPrepend) { if (this->pBottomPiece) { pNew->pNext = this->pBottomPiece; } this->pBottomPiece = pNew; } else { if (this->pTopPiece) { this->pTopPiece->pNext = pNew; } this->pTopPiece = pNew; if (!this->pBottomPiece) { this->pBottomPiece = pNew; } } this->DiscardAllocCaches(); return pNew; } AuSPtr ProtocolStack::AddEndInterceptor(const AuSPtr &pInterceptor) { AU_LOCK_GUARD(this->mutex); if (this->bWrittenEnd) { return {}; } if (!this->pSourceBufer) { return {}; } auto pNew = AuMakeShared(); if (!pNew) { SysPushErrorNet("Out of memory"); return {}; } struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis { AuWPtr pStack; AuWPtr pInterceptor; AuWPtr pParent; EStreamError IsOpen() override { return EStreamError::eErrorNone; } EStreamError Write(const Memory::MemoryViewStreamRead ¶meters) override { auto pCopy = AuMakeShared(parameters); if (!pCopy || !(*pCopy.get())) { return EStreamError::eErrorOutOfMemory; } return pStack.lock()->DoTick(pCopy, pParent.lock()) ? EStreamError::eErrorNone : EStreamError::eErrorStreamInterrupted; } void Close() override { if (auto pStack = AuTryLockMemoryType(this->pStack)) { pStack->Terminate(); } } virtual void Flush() override { if (auto pStack = AuTryLockMemoryType(this->pStack)) { pStack->DoTick(); } } virtual AuSPtr GetOutputBuffer() override { return {}; } virtual AuSPtr GetStreamWriter() override { return AuSharedFromThis(); } }; auto pWrapper = AuMakeShared(); if (!pWrapper) { SysPushErrorMemory(); return {}; } pWrapper->pInterceptor = pInterceptor; pWrapper->pStack = AuSharedFromThis(); pWrapper->pParent = pNew; pNew->pWriteInteface = pWrapper; pNew->pInterceptorEx = pInterceptor; pNew->pParent = this; if (this->pTopPiece) { this->pTopPiece->pNext = pNew; } this->pTopPiece = pNew; if (!this->pBottomPiece) { this->pBottomPiece = pNew; } this->bWrittenEnd = true; this->DiscardAllocCaches(); return pNew; } void ProtocolStack::Destroy() { AU_LOCK_GUARD(this->mutex); if (this->bOwnsSource) { if (this->pSourceBufer) { this->pSourceBufer->Reset(); this->pSourceBufer.reset(); } } auto pItr = this->pBottomPiece; while (pItr) { auto pCur = pItr; pItr = pCur->pNext; pCur->pNext.reset(); pCur->pInterceptor.reset(); pCur->pInterceptorEx.reset(); pCur->pOuputWriter.reset(); pCur->pWriteInteface.reset(); } this->pBottomPiece.reset(); this->pTopPiece.reset(); } AuSPtr ProtocolStack::AsStreamWriter() { if (!this->bOwnsSource) { return {}; } return AuSPtr(this->SharedFromThis(), &this->writeView); } AuSPtr ProtocolStack::AsWritableByteBuffer() { if (!this->bOwnsSource) { return {}; } return this->pSourceBufer; } AuSPtr ProtocolStack::AsStreamReader() { auto pBuffer = AsReadableByteBuffer(); if (!pBuffer) { return {}; } return AuSPtr(this->SharedFromThis(), &this->readView); } AuSPtr ProtocolStack::AsReadableByteBuffer() { if (!this->pTopPiece) { return this->pSourceBufer; } if (this->pTopPiece->outputBuffer.IsEmpty()) { return {}; } return AuSPtr(this->pTopPiece, &this->pTopPiece->outputBuffer); } void ProtocolStack::Terminate() { AU_LOCK_GUARD(this->mutex); if (auto pStack = AuTryLockMemoryType(this->pPipeWork)) { (void)pStack->End(); } this->bDead = true; } bool ProtocolStack::DoTick() { AU_LOCK_GUARD(this->mutex); return this->DoTickEx(true); } bool ProtocolStack::DoTickEx(bool bPermitRead) { if (this->bDead) { return false; } if (!this->pSourceBufer) { return false; } if (!this->pBottomPiece) { return false; } if (this->eType == EProtocolStackCreateType::eByBufferedAdhocStream) { if (!this->TryReadMore()) { return false; } } if (!this->DoTick(this->pSourceBufer, {})) { if (this->bKillPipeOnFirstRootLevelFalse) { if (this->eType == EProtocolStackCreateType::eByPipe) { if (auto pStack = AuTryLockMemoryType(this->pPipeWork)) { pStack->TerminateOnThread(true); } } this->bDead = true; } else { if (this->eType == EProtocolStackCreateType::eByBufferedAdhocStream) { if (this->TryReadMore(true)) { if (this->DoTick(this->pSourceBufer, {})) { return true; } } } } return false; } return true; } bool ProtocolStack::TryReadMore(bool bForce) { if (!bForce) { if (this->pSourceBufer->readPtr != this->pSourceBufer->writePtr) { return true; } } auto pSource = this->pStreamReaderAsInputSource; if (!pSource) { return true; } auto uRemBytes = this->pSourceBufer->RemainingWrite(); auto uFragment = this->uPreferredFragmentSize ? AuMin(uRemBytes, this->uPreferredFragmentSize) : uRemBytes; AuUInt uBytesRead {}; if (AuIO::EStreamError::eErrorNone != this->pStreamReaderAsInputSource->Read({ AuStaticCast(*this->pSourceBufer), uBytesRead })) { if (auto pStack = AuTryLockMemoryType(this->pPipeWork)) { pStack->TerminateOnThread(true); } return false; } this->pSourceBufer->writePtr += uBytesRead; return true; } bool ProtocolStack::DoTick(const AuSPtr &pRead, const AuSPtr &pPiece) { auto pCurrent = pPiece ? pPiece : this->pBottomPiece; if (pCurrent->pInterceptorEx) { bool bTryAgain {}; bool bTryAgainAtleastOnce {}; do { auto pNextStream = ((pPiece == this->pTopPiece) && (this->pDrainBuffer)) ? (this->pDrainBuffer) : (!pCurrent->outputBuffer.IsEmpty() ? AuSPtr(pCurrent, &pCurrent->outputBuffer) : AuSPtr {}); auto pOldHead = pNextStream ? pNextStream->readPtr : nullptr; if (pRead) { if (pRead->flagReadError) { return false; } } bool bStatus = pCurrent->pInterceptorEx->OnDataAvailable(pRead, pNextStream, pCurrent); if (!bStatus) { if (pNextStream) { pNextStream->readPtr = pOldHead; } return bTryAgainAtleastOnce; } else { if (pCurrent->uMax.has_value()) { auto bytesRem = pCurrent->outputBuffer.RemainingBytes(); auto maxVal = pCurrent->uMax.value(); if (bytesRem >= maxVal) { this->Terminate(); return false; } auto target = AuMin(AuPageRound(bytesRem + ((bytesRem / 4) * 3), 128), maxVal); target = AuMax(target, pCurrent->uStartingSize); if (target >= pCurrent->outputBuffer.allocSize || ((pCurrent->outputBuffer.length > target) && (bytesRem < pCurrent->uStartingSize))) { AuByteBuffer replacement(target, true, false); if (!replacement) { this->Terminate(); return false; } if (!replacement.WriteFrom(pCurrent->outputBuffer)) { this->Terminate(); return false; } pCurrent->outputBuffer = AuMove(replacement); pCurrent->outputBuffer.flagAlwaysExpandable = pCurrent->outputBuffer.flagExpandable = 1; } } } if (!pNextStream) { return true; } if (auto pNext = pCurrent->pNext) { auto pOldHead = pNextStream->readPtr; bool bDontCareAboutSubTicks = this->DoTick(pNextStream, pNext); if (!bDontCareAboutSubTicks) { pNextStream->readPtr = pOldHead; } } if (pCurrent->bMultipleTick) { bTryAgain = pRead->RemainingBytes(); bTryAgainAtleastOnce |= bTryAgain; } if (pCurrent->outputBuffer) { if (pCurrent->outputBuffer.flagWriteError) { return false; } } } while (bTryAgain); return true; } else { AuUInt8 *pBase {}; AuUInt uCount {}; if (pRead->writePtr >= pRead->readPtr) { uCount = pRead->writePtr - pRead->readPtr; pBase = pRead->readPtr; } else if (pRead->flagCircular) { uCount = (pRead->base + pRead->length) - pRead->readPtr; pBase = pRead->readPtr; } auto pNextStream = pCurrent->ToNextWriter(); if (!pCurrent->pInterceptor->OnDataAvailable({ pBase, uCount }, pNextStream)) { return false; } return true; } } AuList> ProtocolStack::GetArrayOfInterceptors() { AU_LOCK_GUARD(this->mutex); AuList> list; auto pItr = this->pBottomPiece; while (pItr) { auto pCur = pItr; pItr = pCur->pNext; list.push_back(pCur); } return list; } AuSPtr ProtocolStack::GetInterceptorAtIndex(AuUInt32 uIndex) { AU_LOCK_GUARD(this->mutex); AuUInt32 uCounter {}; auto pItr = this->pBottomPiece; while (pItr) { auto pCur = pItr; pItr = pCur->pNext; if (uIndex == uCounter++) { return pCur; } } return {}; } bool ProtocolStack::IsValid() { return !this->bDead; } void ProtocolStack::DiscardAllocCaches() { AuResetMember(this->pStreamReaderCache); AuResetMember(this->pStreamWriterCache); } static AuSPtr _NewBufferedProtocolStack(AuUInt uLength, ProtocolStackExtraData ex) { if (!uLength) { SysPushErrorArg(); return {}; } auto pStack = AuMakeShared(); if (!pStack) { SysPushErrorMem(); return {}; } pStack->eType = EProtocolStackCreateType::eByBuffered; auto pBuffer = AuMakeShared(uLength, true); if (!pBuffer) { SysPushErrorMem(); return {}; } if (!pBuffer->IsValid()) { return {}; } pStack->bOwnsSource = true; pStack->pSourceBufer = pBuffer; pStack->bStreamReaderFlagAdhocPump = ex.bStreamReaderFlagAdhocPump; if (ex.bKillPipeOnFirstRootLevelFalse) { pStack->bKillPipeOnFirstRootLevelFalse = ex.bKillPipeOnFirstRootLevelFalse.value(); } return pStack; } AUKN_SYM AuSPtr NewBufferedProtocolStack(AuUInt uLength) { return _NewBufferedProtocolStack(uLength, {}); } static AuSPtr _NewProtocolStackFromPipe(const AuSPtr &pWork, bool bAutoTick, bool bKillPipeOnFirstRootLevelFalse, ProtocolStackExtraData ex) { if (!pWork) { SysPushErrorArg(); return {}; } auto pStack = AuMakeShared(); if (!pStack) { SysPushErrorMem(); return {}; } pStack->eType = EProtocolStackCreateType::eByPipe; auto pWorkEx = AuStaticCast(pWork); pStack->pPipeWork = pWorkEx; pStack->bKillPipeOnFirstRootLevelFalse = bKillPipeOnFirstRootLevelFalse; if (ex.bKillPipeOnFirstRootLevelFalse) { pStack->bKillPipeOnFirstRootLevelFalse = ex.bKillPipeOnFirstRootLevelFalse.value(); } pStack->bStreamReaderFlagAdhocPump = ex.bStreamReaderFlagAdhocPump; if (bAutoTick) { if (pWorkEx->pProtocolStack) { SysPushErrorGeneric("NewProtocolStackFromPipe called twice. wont recv DoTick"); return {}; } else { pWorkEx->pProtocolStack = pStack; pStack->pSourceBufer = AuUnsafeRaiiToShared(pWorkEx->GetBuffer()); return pStack; } } pStack->pSourceBufer = AuSPtr(pWork, AuStaticCast(pWork)->GetBuffer()); return pStack; } static AuSPtr _NewProtocolStackFromStream(const ProtocolStackByBufferedAdhoc &data, ProtocolStackExtraData ex) { auto pStack = AuMakeShared(); if (!pStack) { SysPushErrorMem(); return {}; } if (!data.uBufferedStreamLength) { SysPushErrorArg(); return {}; } pStack->eType = EProtocolStackCreateType::eByBufferedAdhocStream; auto pBuffer = AuMakeShared(data.uBufferedStreamLength, true); if (!pBuffer) { SysPushErrorMem(); return {}; } if (!pBuffer->IsValid()) { return {}; } if (ex.bKillPipeOnFirstRootLevelFalse) { pStack->bKillPipeOnFirstRootLevelFalse = ex.bKillPipeOnFirstRootLevelFalse.value(); } pStack->bOwnsSource = true; pStack->pSourceBufer = pBuffer; pStack->pStreamReaderAsInputSource = data.pStreamReader; pStack->uPreferredFragmentSize = data.uPreferredFragmentSize; pStack->bStreamReaderFlagAdhocPump = ex.bStreamReaderFlagAdhocPump; pStack->bStreamReaderFlagAdhocRead = ex.bStreamReaderFlagAdhocReadFromAdhoc; return pStack; } AUKN_SYM AuSPtr NewProtocolStackFromPipe(const AuSPtr &pWork, bool bAutoTick, bool bKillPipeOnFirstRootLevelFalse) { return _NewProtocolStackFromPipe(pWork, bAutoTick, bKillPipeOnFirstRootLevelFalse, {}); } AUKN_SYM AuSPtr NewProtocolStackFromDescription(const ProtocolStackDescription &description) { if (!EProtocolStackCreateTypeIsValid(description.type)) { SysPushErrorArg(); return {}; } switch (description.type) { case EProtocolStackCreateType::eByPipe: return _NewProtocolStackFromPipe(description.byPipe.pWork, description.byPipe.bAutoTick, description.byPipe.bKillPipeOnFirstRootLevelFalse, description.extendedFlags); case EProtocolStackCreateType::eByBuffered: return _NewBufferedProtocolStack(description.byBuffered.uBufferedStreamLength, description.extendedFlags); case EProtocolStackCreateType::eByBufferedAdhocStream: return _NewProtocolStackFromStream(description.byBufferedStream, description.extendedFlags); default: SysUnreachable(); } } AUKN_SYM AuSPtr UtilityWrapStreamReaderWithInterceptors(AuSPtr pStreamReader, AuList>> pairs, AuUInt uBaseStreamLength, AuUInt uPreferredFragmentSize, AuSPtr *pOptOutStack) { ProtocolStackDescription desc; desc.type = EProtocolStackCreateType::eByBufferedAdhocStream; desc.byBufferedStream.pStreamReader = pStreamReader; desc.byBufferedStream.uBufferedStreamLength = uBaseStreamLength; desc.byBufferedStream.uPreferredFragmentSize = uPreferredFragmentSize; desc.extendedFlags.bStreamReaderFlagAdhocReadFromAdhoc = true; AuSPtr pStack; if (!(pStack = NewProtocolStackFromDescription(desc))) { SysPushErrorNested(); return {}; } for (AU_ITERATE_N(i, pairs.size())) { const auto &[bSingle, uNextBufferSize, pPointer] = pairs[i]; if (bSingle) { if (!pStack->AppendSingleFrameProcessorEx(pPointer, uNextBufferSize)) { SysPushErrorNested(); return {}; } } else { if (i == pairs.size() - 1) { if (!pStack->AddEndInterceptor(pPointer)) { SysPushErrorNested(); return {}; } } else { if (!pStack->AppendInterceptorEx(pPointer, uNextBufferSize)) { SysPushErrorNested(); return {}; } } } } if (pOptOutStack) { *pOptOutStack = pStack; } return pStack->AsStreamReader(); } }