[+] Aurora::IO::Protocol::NewProtocolStackFromDescription

[+] Aurora::IO::Protocol::UtilityWrapStreamReaderWithInterceptors
[+] Aurora::IO::Protocol::EProtocolStackCreateType
[+] Aurora::IO::Protocol::ProtocolStackByBufferedAdhoc
[+] Aurora::IO::Protocol::ProtocolStackByBuffered
[+] Aurora::IO::Protocol::ProtocolStackByPipe
[+] Aurora::IO::Protocol::ProtocolStackDescription
This commit is contained in:
Reece Wilson 2023-09-21 08:02:02 +01:00
parent f310fa9aae
commit 7da39d2022
7 changed files with 453 additions and 81 deletions

View File

@ -134,5 +134,7 @@ namespace Aurora::IO::Protocol
* @param pWork
* @return
*/
AUKN_SYM AuSPtr<IProtocolStack> NewProtocolStackFromPipe(const AuSPtr<IIOPipeWork> &pWork, bool bAutoTick = true, bool bKillPipeOnFirstRootLevelFalse = true);
AUKN_SYM AuSPtr<IProtocolStack> NewProtocolStackFromPipe(const AuSPtr<IIOPipeWork> &pWork,
bool bAutoTick = true,
bool bKillPipeOnFirstRootLevelFalse = true);
}

View File

@ -17,3 +17,5 @@
#include "IProtocolStack.hpp"
#include "IProtocolLimitable.hpp"
#include "ProtocolStackFromDescription.hpp"

View File

@ -0,0 +1,60 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ProtocolStackFromDescription.cpp
Date: 2023-09-21
Author: Reece
***/
#pragma once
namespace Aurora::IO::Protocol
{
struct ProtocolStackByBuffered
{
AuUInt uBufferedStreamLength { 128 * 1024 };
};
struct ProtocolStackByBufferedAdhoc : ProtocolStackByBuffered
{
AuSPtr<IStreamReader> pStreamReader {};
AuUInt uPreferredFragmentSize {};
};
struct ProtocolStackByPipe
{
AuSPtr<IIOPipeWork> pWork;
bool bAutoTick = true;
bool bKillPipeOnFirstRootLevelFalse = true;
};
struct ProtocolStackExtraData
{
AuOptionalEx<bool> bKillPipeOnFirstRootLevelFalse {};
bool bStreamReaderFlagAdhocPump {};
bool bStreamReaderFlagAdhocReadFromAdhoc {};
bool bFeatPending {};
};
AUE_DEFINE_VA(EProtocolStackCreateType,
eByPipe,
eByBuffered,
eByBufferedAdhocStream
);
struct ProtocolStackDescription
{
EProtocolStackCreateType type;
ProtocolStackByPipe byPipe;
ProtocolStackByBuffered byBuffered;
ProtocolStackByBufferedAdhoc byBufferedStream;
ProtocolStackExtraData extendedFlags;
};
AUKN_SYM AuSPtr<IProtocolStack> NewProtocolStackFromDescription(const ProtocolStackDescription &description);
AUKN_SYM AuSPtr<IStreamReader> UtilityWrapStreamReaderWithInterceptors(AuSPtr<IStreamReader> pStreamReader,
AuList<AuTuple<bool /* bSingleFrame*/, AuUInt /*nextBufferSize*/, AuSPtr<IProtocolInterceptorEx>>> pairs,
AuUInt uBaseStreamLength = 128 * 1024,
AuUInt uPreferredFragmentSize = 0,
AuSPtr<IProtocolStack> *pOptOutStack = nullptr);
}

View File

@ -13,6 +13,12 @@
namespace Aurora::IO::Protocol
{
ProtocolStack::ProtocolStack() :
readView(this)
{
}
ProtocolStack::~ProtocolStack()
{
}
@ -120,7 +126,6 @@ namespace Aurora::IO::Protocol
}
};
virtual void Flush() override
{
if (auto pStack = AuTryLockMemoryType(this->pStack))
@ -463,6 +468,11 @@ namespace Aurora::IO::Protocol
{
auto pStackBuffer = pStack->pSourceBufer;//pStack->AsWritableByteBuffer();
if (pStack->bDead)
{
return EStreamError::eErrorHandleClosed;
}
if (!pStackBuffer)
{
return EStreamError::eErrorStreamNotOpen;
@ -485,6 +495,11 @@ namespace Aurora::IO::Protocol
{
auto pStackBuffer = pStack->pSourceBufer; //pStack->AsWritableByteBuffer();
if (pStack->bDead)
{
return EStreamError::eErrorHandleClosed;
}
if (!pStackBuffer)
{
return EStreamError::eErrorStreamNotOpen;
@ -540,80 +555,7 @@ namespace Aurora::IO::Protocol
return {};
}
if (this->pStreamReaderCache)
{
return this->pStreamReaderCache;
}
struct A : AuIO::IStreamReader
{
AuWPtr<ProtocolStack> wpStack;
A(AuWPtr<ProtocolStack> wpStack) :
wpStack(wpStack)
{
}
inline virtual EStreamError IsOpen() override
{
if (auto pStack = AuTryLockMemoryType(this->wpStack))
{
auto pStackBuffer = pStack->AsReadableByteBuffer();
if (!pStackBuffer)
{
return EStreamError::eErrorStreamNotOpen;
}
else
{
return EStreamError::eErrorNone;
}
}
else
{
return EStreamError::eErrorHandleClosed;
}
}
// let's be really kind...
// let's permit the protocol-stack to be updated after our creation.
// ordinarily, one would assume this action would invalidate previous readers.
// we could support this by: reading the old buffer (preferred)
// or: reading the top buffer no matter what (pragmatic - we are a reader for this ProtocolStack stack)
// this routine implements the latter - always use the current state of the protocol stack to read.
inline virtual EStreamError Read(const Memory::MemoryViewStreamWrite &parameters) override
{
if (auto pStack = AuTryLockMemoryType(this->wpStack))
{
auto pStackBuffer = pStack->AsReadableByteBuffer();
if (!pStackBuffer)
{
return EStreamError::eErrorStreamNotOpen;
}
else
{
parameters.outVariable = pStackBuffer->Read(parameters.ptr, parameters.length);
return parameters.outVariable == 0 ? EStreamError::eErrorEndOfStream : EStreamError::eErrorNone;
}
}
else
{
return EStreamError::eErrorHandleClosed;
}
}
virtual void Close() override
{
if (auto pStack = AuTryLockMemoryType(wpStack))
{
pStack->Terminate();
}
}
};
return this->pStreamReaderCache = AuMakeShared<A>(this->SharedFromThis());
return AuSPtr<IStreamReader>(this->SharedFromThis(), &this->readView);
}
AuSPtr<Memory::ByteBuffer> ProtocolStack::AsReadableByteBuffer()
@ -637,10 +579,22 @@ namespace Aurora::IO::Protocol
{
(void)pStack->End();
}
this->bDead = true;
}
bool ProtocolStack::DoTick()
{
return this->DoTickEx(true);
}
bool ProtocolStack::DoTickEx(bool bPermitRead)
{
if (this->bDead)
{
return false;
}
if (!this->pSourceBufer)
{
return false;
@ -651,13 +605,39 @@ namespace Aurora::IO::Protocol
return false;
}
if (this->eType == EProtocolStackCreateType::eByBufferedAdhocStream)
{
if (!this->TryReadMore())
{
return false;
}
}
if (!this->DoTick(this->pSourceBufer, {}))
{
if (this->bKillPipeOnFirstRootLevelFalse)
{
if (auto pStack = AuTryLockMemoryType(this->pPipeWork))
if (this->eType == EProtocolStackCreateType::eByPipe)
{
pStack->TerminateOnThread(true);
if (auto pStack = AuTryLockMemoryType(this->pPipeWork))
{
pStack->TerminateOnThread(true);
}
}
this->bDead = true;
}
else
{
if (this->eType == EProtocolStackCreateType::eByBufferedAdhocStream)
{
if (this->TryReadMore(true))
{
if (this->DoTick(this->pSourceBufer, {}))
{
return true;
}
}
}
}
@ -667,6 +647,44 @@ namespace Aurora::IO::Protocol
return true;
}
bool ProtocolStack::TryReadMore(bool bForce)
{
if (!bForce)
{
if (this->pSourceBufer->readPtr !=
this->pSourceBufer->writePtr)
{
return true;
}
}
auto pSource = this->pStreamReaderAsInputSource;
if (!pSource)
{
return true;
}
auto uRemBytes = this->pSourceBufer->RemainingWrite();
auto uFragment = this->uPreferredFragmentSize ?
AuMin<AuUInt>(uRemBytes, this->uPreferredFragmentSize) :
uRemBytes;
AuUInt uBytesRead {};
if (AuIO::EStreamError::eErrorNone !=
this->pStreamReaderAsInputSource->Read({ AuStaticCast<AuMemoryViewWrite>(*this->pSourceBufer), uBytesRead }))
{
if (auto pStack = AuTryLockMemoryType(this->pPipeWork))
{
pStack->TerminateOnThread(true);
}
return false;
}
this->pSourceBufer->writePtr += uBytesRead;
return true;
}
bool ProtocolStack::DoTick(const AuSPtr<AuByteBuffer> &pRead, const AuSPtr<ProtocolPiece> &pPiece)
{
auto pCurrent = pPiece ? pPiece : this->pBottomPiece;
@ -803,7 +821,8 @@ namespace Aurora::IO::Protocol
AuResetMember(this->pStreamWriterCache);
}
AUKN_SYM AuSPtr<IProtocolStack> NewBufferedProtocolStack(AuUInt uLength)
static AuSPtr<IProtocolStack> _NewBufferedProtocolStack(AuUInt uLength,
ProtocolStackExtraData ex)
{
if (!uLength)
{
@ -818,6 +837,8 @@ namespace Aurora::IO::Protocol
return {};
}
pStack->eType = EProtocolStackCreateType::eByBuffered;
auto pBuffer = AuMakeShared<AuByteBuffer>(uLength, true);
if (!pBuffer)
{
@ -833,10 +854,25 @@ namespace Aurora::IO::Protocol
pStack->bOwnsSource = true;
pStack->pSourceBufer = pBuffer;
pStack->bStreamReaderFlagAdhocPump = ex.bStreamReaderFlagAdhocPump;
if (ex.bKillPipeOnFirstRootLevelFalse)
{
pStack->bKillPipeOnFirstRootLevelFalse = ex.bKillPipeOnFirstRootLevelFalse.value();
}
return pStack;
}
AUKN_SYM AuSPtr<IProtocolStack> NewProtocolStackFromPipe(const AuSPtr<IIOPipeWork> &pWork, bool bAutoTick, bool bKillPipeOnFirstRootLevelFalse)
AUKN_SYM AuSPtr<IProtocolStack> NewBufferedProtocolStack(AuUInt uLength)
{
return _NewBufferedProtocolStack(uLength, {});
}
static AuSPtr<IProtocolStack> _NewProtocolStackFromPipe(const AuSPtr<IIOPipeWork> &pWork,
bool bAutoTick,
bool bKillPipeOnFirstRootLevelFalse,
ProtocolStackExtraData ex)
{
if (!pWork)
{
@ -851,11 +887,20 @@ namespace Aurora::IO::Protocol
return {};
}
pStack->eType = EProtocolStackCreateType::eByPipe;
auto pWorkEx = AuStaticCast<IOPipeWork>(pWork);
pStack->pPipeWork = pWorkEx;
pStack->bKillPipeOnFirstRootLevelFalse = bKillPipeOnFirstRootLevelFalse;
if (ex.bKillPipeOnFirstRootLevelFalse)
{
pStack->bKillPipeOnFirstRootLevelFalse = ex.bKillPipeOnFirstRootLevelFalse.value();
}
pStack->bStreamReaderFlagAdhocPump = ex.bStreamReaderFlagAdhocPump;
if (bAutoTick)
{
if (pWorkEx->pProtocolStack)
@ -874,4 +919,139 @@ namespace Aurora::IO::Protocol
pStack->pSourceBufer = AuSPtr<AuByteBuffer>(pWork, AuStaticCast<IOPipeWork>(pWork)->GetBuffer());
return pStack;
}
static AuSPtr<IProtocolStack> _NewProtocolStackFromStream(const ProtocolStackByBufferedAdhoc &data,
ProtocolStackExtraData ex)
{
auto pStack = AuMakeShared<ProtocolStack>();
if (!pStack)
{
SysPushErrorMem();
return {};
}
if (!data.uBufferedStreamLength)
{
SysPushErrorArg();
return {};
}
pStack->eType = EProtocolStackCreateType::eByBufferedAdhocStream;
auto pBuffer = AuMakeShared<AuByteBuffer>(data.uBufferedStreamLength, true);
if (!pBuffer)
{
SysPushErrorMem();
return {};
}
if (!pBuffer->IsValid())
{
return {};
}
if (ex.bKillPipeOnFirstRootLevelFalse)
{
pStack->bKillPipeOnFirstRootLevelFalse = ex.bKillPipeOnFirstRootLevelFalse.value();
}
pStack->bOwnsSource = true;
pStack->pSourceBufer = pBuffer;
pStack->pStreamReaderAsInputSource = data.pStreamReader;
pStack->uPreferredFragmentSize = data.uPreferredFragmentSize;
pStack->bStreamReaderFlagAdhocPump = ex.bStreamReaderFlagAdhocPump;
pStack->bStreamReaderFlagAdhocRead = ex.bStreamReaderFlagAdhocReadFromAdhoc;
return pStack;
}
AUKN_SYM AuSPtr<IProtocolStack> NewProtocolStackFromPipe(const AuSPtr<IIOPipeWork> &pWork,
bool bAutoTick,
bool bKillPipeOnFirstRootLevelFalse)
{
return _NewProtocolStackFromPipe(pWork, bAutoTick, bKillPipeOnFirstRootLevelFalse, {});
}
AUKN_SYM AuSPtr<IProtocolStack> NewProtocolStackFromDescription(const ProtocolStackDescription &description)
{
if (!EProtocolStackCreateTypeIsValid(description.type))
{
SysPushErrorArg();
return {};
}
switch (description.type)
{
case EProtocolStackCreateType::eByPipe:
return _NewProtocolStackFromPipe(description.byPipe.pWork, description.byPipe.bAutoTick, description.byPipe.bKillPipeOnFirstRootLevelFalse, description.extendedFlags);
case EProtocolStackCreateType::eByBuffered:
return _NewBufferedProtocolStack(description.byBuffered.uBufferedStreamLength, description.extendedFlags);
case EProtocolStackCreateType::eByBufferedAdhocStream:
return _NewProtocolStackFromStream(description.byBufferedStream, description.extendedFlags);
default:
SysUnreachable();
}
}
AUKN_SYM AuSPtr<IStreamReader> UtilityWrapStreamReaderWithInterceptors(AuSPtr<IStreamReader> pStreamReader,
AuList<AuTuple<bool /* bSingleFrame*/, AuUInt /*nextBufferSize*/, AuSPtr<IProtocolInterceptorEx>>> pairs,
AuUInt uBaseStreamLength,
AuUInt uPreferredFragmentSize,
AuSPtr<IProtocolStack> *pOptOutStack)
{
ProtocolStackDescription desc;
desc.type = EProtocolStackCreateType::eByBufferedAdhocStream;
desc.byBufferedStream.pStreamReader = pStreamReader;
desc.byBufferedStream.uBufferedStreamLength = uBaseStreamLength;
desc.byBufferedStream.uPreferredFragmentSize = uPreferredFragmentSize;
desc.extendedFlags.bStreamReaderFlagAdhocReadFromAdhoc = true;
AuSPtr<IProtocolStack> pStack;
if (!(pStack = NewProtocolStackFromDescription(desc)))
{
SysPushErrorNested();
return {};
}
for (AU_ITERATE_N(i, pairs.size()))
{
const auto &[bSingle, uNextBufferSize, pPointer] = pairs[i];
if (bSingle)
{
if (!pStack->AppendSingleFrameProcessorEx(pPointer, uNextBufferSize))
{
SysPushErrorNested();
return {};
}
}
else
{
if (i == pairs.size() - 1)
{
if (!pStack->AddEndInterceptor(pPointer))
{
SysPushErrorNested();
return {};
}
}
else
{
if (!pStack->AppendInterceptorEx(pPointer, uNextBufferSize))
{
SysPushErrorNested();
return {};
}
}
}
}
if (pOptOutStack)
{
*pOptOutStack = pStack;
}
return pStack->AsStreamReader();
}
}

View File

@ -8,6 +8,7 @@
#pragma once
#include "../AuIOPipeProcessor.hpp"
#include "AuProtocolStackSpecialReader.hpp"
namespace Aurora::IO::Protocol
{
@ -16,12 +17,17 @@ namespace Aurora::IO::Protocol
struct ProtocolStack : IProtocolStack,
AuEnableSharedFromThis<ProtocolStack>
{
ProtocolStack();
~ProtocolStack();
void Terminate();
bool DoTick() override;
bool DoTick(const AuSPtr<AuByteBuffer> &read, const AuSPtr<ProtocolPiece> &pPiece);
bool DoTickEx(bool bPermitRead = true);
bool TryReadMore(bool bForce = false);
AuSPtr<IProtocolPiece> AppendInterceptor(const AuSPtr<IProtocolInterceptor> &pInterceptor, AuUInt uOutputBufferSize) override;
AuSPtr<IProtocolPiece> AppendInterceptorEx(const AuSPtr<IProtocolInterceptorEx> &pInterceptor, AuUInt uOutputBufferSize) override;
AuSPtr<IProtocolPiece> AppendSingleFrameProcessor(const AuSPtr<IProtocolInterceptorEx> &pInterceptorEx) override;
@ -48,6 +54,12 @@ namespace Aurora::IO::Protocol
AuList<AuSPtr<IProtocolPiece>> GetArrayOfInterceptors() override;
AuSPtr<IProtocolPiece> GetInterceptorAtIndex(AuUInt32 uIndex) override;
EProtocolStackCreateType eType;
bool bDead {};
bool bStreamReaderFlagAdhocRead {};
bool bStreamReaderFlagAdhocPump {};
AuSPtr<AuByteBuffer> pSourceBufer;
bool bOwnsSource {};
@ -64,6 +76,11 @@ namespace Aurora::IO::Protocol
AuSPtr<IStreamReader> pStreamReaderCache;
AuSPtr<IStreamWriter> pStreamWriterCache;
AuSPtr<IStreamReader> pStreamReaderAsInputSource;
AuUInt uPreferredFragmentSize {};
SpecialReader readView;
void DiscardAllocCaches();
};
}

View File

@ -0,0 +1,80 @@
/***
Copyright (C) 2022-2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuProtocolStackSpecialReader.cpp
File: AuProtocolStack.cpp
Date: 2022-8-24
Date: 2023-09-21
Author: Reece
***/
#include "Protocol.hpp"
#include "AuProtocolStack.hpp"
#include "AuProtocolPiece.hpp"
#include "AuProtocolStackSpecialReader.hpp"
namespace Aurora::IO::Protocol
{
EStreamError SpecialReader::IsOpen()
{
auto pStack = this->pParent;
if (pStack->bDead)
{
return EStreamError::eErrorHandleClosed;
}
auto pStackBuffer = pStack->AsReadableByteBuffer();
if (!pStackBuffer)
{
return EStreamError::eErrorStreamNotOpen;
}
else
{
return EStreamError::eErrorNone;
}
}
EStreamError SpecialReader::Read(const Memory::MemoryViewStreamWrite &parameters)
{
auto pStack = this->pParent;
auto pStackBuffer = pStack->AsReadableByteBuffer();
if (pStack->bDead)
{
return EStreamError::eErrorHandleClosed;
}
if (!pStackBuffer)
{
return EStreamError::eErrorStreamNotOpen;
}
auto uAvail = pStackBuffer->RemainingBytes();
if (!uAvail)
{
bool bMayAdHocRead = pStack->bStreamReaderFlagAdhocRead &&
pStack->eType == EProtocolStackCreateType::eByBufferedAdhocStream;
bool bMayAdHocPump = pStack->bStreamReaderFlagAdhocPump &&
pStack->pSourceBufer &&
pStack->pSourceBufer->RemainingBytes();
if (bMayAdHocRead || bMayAdHocPump)
{
if (!pStack->DoTickEx(false))
{
return EStreamError::eErrorEndOfStream;
}
}
}
parameters.outVariable = pStackBuffer->Read(parameters.ptr, parameters.length);
return parameters.outVariable == 0 ? EStreamError::eErrorEndOfStream : EStreamError::eErrorNone;
}
void SpecialReader::Close()
{
this->pParent->Terminate();
}
}

View File

@ -0,0 +1,31 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuProtocolStackSpecialReader.hpp
Date: 2023-09-21
Author: Reece
***/
#pragma once
namespace Aurora::IO::Protocol
{
struct ProtocolStack;
struct SpecialReader : IStreamReader
{
ProtocolStack *pParent {};
inline SpecialReader()
{ }
inline SpecialReader(ProtocolStack *pParent) : pParent(pParent)
{ }
inline ~SpecialReader()
{ }
EStreamError IsOpen() override;
EStreamError Read(const Memory::MemoryViewStreamWrite &paramters) override;
void Close() override;
};
}