[*] Modernize IOBufferedProcessor (and moved to the io utility namespace)

[+] AuIO::Utility::NewBufferedProcessorEx
[+] AuByteBuffer::calcDifferenceBetweenHeadsUnsigned
[+] AuByteBuffer::calcDifferenceBetweenHeadsSigned
This commit is contained in:
Reece Wilson 2023-09-28 12:22:42 +01:00
parent 742c0495d8
commit 635f9e39e9
6 changed files with 214 additions and 140 deletions

View File

@ -42,7 +42,7 @@
//#include "IBufferedReadProcessorToReader.hpp" //#include "IBufferedReadProcessorToReader.hpp"
#include "IIOPipeInterceptor.hpp" #include "IIOPipeInterceptor.hpp"
#include "IIOBufferedProcessor.hpp" #include "Utility/IIOBufferedProcessor.hpp"
#include "Utility/IByteBufferStreamPair.hpp" #include "Utility/IByteBufferStreamPair.hpp"

View File

@ -7,7 +7,9 @@
***/ ***/
#pragma once #pragma once
namespace Aurora::IO #include "../Protocol/IProtocolInterceptorEx.hpp"
namespace Aurora::IO::Utility
{ {
struct IIOBufferedProcessor struct IIOBufferedProcessor
{ {
@ -22,11 +24,15 @@ namespace Aurora::IO
* @param processor * @param processor
* @param drain * @param drain
* @param bufferSize * @param bufferSize
* @deprecated by Aurora::IO::Protocol (tobe AuProtocol:: and AuIOProtocol::)
* @return * @return
*/ */
AUKN_SYM AuSPtr<IIOBufferedProcessor> NewBufferedProcessor(const AuSPtr<IStreamReader> &pSource, AUKN_SYM AuSPtr<IIOBufferedProcessor> NewBufferedProcessor(const AuSPtr<IStreamReader> &pSource,
const AuSPtr<IIOPipeInterceptor> &pProcessor, const AuSPtr<IIOPipeInterceptor> &pProcessor,
const AuSPtr<IStreamWriter> &pDrain, // TODO: share IOPipeCallback const AuSPtr<IStreamWriter> &pDrain,
AuUInt32 uBufferSize); AuUInt32 uBufferSize);
AUKN_SYM AuSPtr<IIOBufferedProcessor> NewBufferedProcessorEx(const AuSPtr<IStreamReader> &pSource,
const AuSPtr<Protocol::IProtocolInterceptorEx> &pProcessor,
const AuSPtr<IStreamWriter> &pDrain,
AuUInt32 uBufferSize);
} }

View File

@ -527,6 +527,32 @@ namespace Aurora::Memory
template<typename T> template<typename T>
bool ReadTagged(T &out); bool ReadTagged(T &out);
//
inline AuUInt calcDifferenceBetweenHeadsUnsigned(AuUInt8 *pHeadPointer, AuUInt8 *pSubtrahend)
{
if (pHeadPointer > pSubtrahend)
{
return pHeadPointer - pSubtrahend;
}
else
{
return (pSubtrahend - this->base) + ((this->base + this->length) - pHeadPointer);
}
}
inline AuSInt calcDifferenceBetweenHeadsSigned(AuUInt8 *pHeadPointer, AuUInt8 *pSubtrahend)
{
if (pHeadPointer > pSubtrahend)
{
return pHeadPointer - pSubtrahend;
}
else
{
return -(pSubtrahend - this->base);
}
}
}; };
static ByteBuffer NewResizableBuffer(AuUInt32 length = 0) static ByteBuffer NewResizableBuffer(AuUInt32 length = 0)

View File

@ -1,135 +0,0 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuIOBufferedProcessor.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "AuIOBufferedProcessor.hpp"
namespace Aurora::IO
{
struct IOBufferedProcessor : IIOBufferedProcessor
{
AuSPtr<IStreamReader> pSource;
AuSPtr<IStreamWriter> pDrain;
AuSPtr<IIOPipeInterceptor> pProcessor;
AuUInt32 uBufferSize {};
AuByteBuffer buffer;
AU_DEFINE_FOR_VA(IOBufferedProcessor,
(AU_DEFINE_CTOR_VA, // initializer-list-like ctor (extending a struct or adding a ctor will break initializer lists)
AU_DEFINE_THIS_MOVE_CTOR_VA, // add move `Object(Object &&)`
AU_DEFINE_EQUALS_VA, // add equals operator
AU_DEFINE_MOVE_VA, // add move assignment operator
AU_DEFINE_COPY_VA), // add copy assignment operator
(pSource, pDrain, pProcessor, uBufferSize));
AuUInt32 TryProcessBuffered() override;
AuUInt32 GetRawBytesBuffered() override;
AuUInt32 GetRawBytesLimit() override;
AuUInt32 TryPump();
};
AuUInt32 IOBufferedProcessor::TryProcessBuffered()
{
if (this->buffer.IsEmpty())
{
this->buffer.Allocate(this->uBufferSize);
this->buffer.flagCircular = true; // !!!
}
if (this->buffer.IsEmpty())
{
return TryPump();
}
AuUInt canBuffer = this->buffer.RemainingWrite();
canBuffer = AuMin(canBuffer, AuUInt((this->buffer.length + this->buffer.base) - this->buffer.writePtr));
AuUInt read {};
try
{
if (this->pSource->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer.writePtr, canBuffer), read)) !=
AuIO::EStreamError::eErrorNone)
{
return TryPump();
}
}
catch (...)
{
SysPushErrorCatch();
}
this->buffer.writePtr += read;
if (this->buffer.writePtr == this->buffer.base + this->buffer.length)
{
this->buffer.writePtr = this->buffer.base;
}
return TryPump();
}
AuUInt32 IOBufferedProcessor::TryPump()
{
AuUInt bytesProcessedTotal {};
AuUInt bytesProcessed {};
do
{
if (this->buffer.flagCircular && this->buffer.readPtr == this->buffer.base + this->buffer.length)
{
this->buffer.readPtr = this->buffer.base;
}
AuUInt canRead = this->buffer.RemainingBytes();
canRead = AuMin<AuUInt>(canRead, (this->buffer.length + this->buffer.base) - this->buffer.readPtr);
try
{
if (!this->pProcessor->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer.readPtr, canRead), bytesProcessed), this->pDrain))
{
break;
}
}
catch (...)
{
SysPushErrorCatch();
}
this->buffer.readPtr += bytesProcessed;
bytesProcessedTotal += bytesProcessed;
if (this->buffer.flagCircular && this->buffer.readPtr == this->buffer.base + this->buffer.length)
{
this->buffer.readPtr = this->buffer.base;
}
}
while (AuExchange(bytesProcessed, 0));
return bytesProcessedTotal;
}
AuUInt32 IOBufferedProcessor::GetRawBytesBuffered()
{
return this->buffer.RemainingBytes();
}
AuUInt32 IOBufferedProcessor::GetRawBytesLimit()
{
return this->uBufferSize;
}
AUKN_SYM AuSPtr<IIOBufferedProcessor> NewBufferedProcessor(const AuSPtr<IStreamReader> &pSource,
const AuSPtr<IIOPipeInterceptor> &pProcessor,
const AuSPtr<IStreamWriter> &pDrain,
AuUInt32 uBufferSize)
{
return AuMakeShared<IOBufferedProcessor>(pSource, pDrain, pProcessor, uBufferSize);
}
}

View File

@ -0,0 +1,177 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuIOBufferedProcessor.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "AuIOBufferedProcessor.hpp"
namespace Aurora::IO::Utility
{
struct IOBufferedProcessor :
IIOBufferedProcessor,
AuEnableSharedFromThis<IOBufferedProcessor>
{
AuSPtr<IStreamReader> pSource;
AuSPtr<IStreamWriter> pDrain;
AuSPtr<IIOPipeInterceptor> pProcessor;
AuSPtr<Protocol::IProtocolInterceptorEx> pProcessorEx;
AuUInt32 uBufferSize {};
AuByteBuffer buffer;
AuByteBuffer buffer2;
bool bErrored {};
AU_DEFINE_FOR_VA(IOBufferedProcessor,
(AU_DEFINE_CTOR_VA, // initializer-list-like ctor (extending a struct or adding a ctor will break initializer lists)
AU_DEFINE_THIS_MOVE_CTOR_VA, // add move `Object(Object &&)`
AU_DEFINE_EQUALS_VA, // add equals operator
AU_DEFINE_MOVE_VA, // add move assignment operator
AU_DEFINE_COPY_VA), // add copy assignment operator
(pSource, pDrain, pProcessor, uBufferSize));
IOBufferedProcessor(AuSPtr<IStreamReader> pSource,
AuSPtr<IStreamWriter> pDrain,
AuSPtr<Protocol::IProtocolInterceptorEx> pProcessor,
AuUInt32 uBufferSize) :
pSource(pSource),
pDrain(pDrain),
pProcessorEx(pProcessor),
uBufferSize(uBufferSize)
{
}
AuUInt32 TryProcessBuffered() override;
AuUInt32 GetRawBytesBuffered() override;
AuUInt32 GetRawBytesLimit() override;
AuUInt32 TryPump();
};
AuUInt32 IOBufferedProcessor::TryProcessBuffered()
{
if (this->buffer.IsEmpty())
{
this->buffer.Allocate(this->uBufferSize);
this->buffer.flagCircular = true;
if (this->pProcessorEx)
{
this->buffer2.Allocate(this->uBufferSize);
this->buffer2.flagCircular = true;
}
}
if (!this->buffer ||
(this->pProcessorEx && !this->buffer2))
{
return TryPump();
}
AuUInt read {};
try
{
if (this->pSource->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer), read)) !=
AuIO::EStreamError::eErrorNone)
{
return this->TryPump();
}
}
catch (...)
{
SysPushErrorCatch();
}
this->buffer.writePtr += read;
return this->TryPump();
}
AuUInt32 IOBufferedProcessor::TryPump()
{
AuUInt uBytesProcessedTotal {};
AuUInt uBytesProcessed {};
if (this->bErrored)
{
return 0;
}
do
{
try
{
if (this->pProcessor)
{
if (!this->pProcessor->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer), uBytesProcessed), this->pDrain))
{
break;
}
}
else if (this->pProcessorEx)
{
auto pRead = this->buffer.readPtr;
if (!this->pProcessorEx->OnDataAvailable(AuSharedPointerFromThis(&this->buffer), AuSharedPointerFromThis(&this->buffer2), {}))
{
this->buffer.readPtr = pRead;
break;
}
AuUInt uWritten {};
if (AuIO::WriteAll(this->pDrain.get(), { AuMemoryViewRead(this->buffer2), uWritten }) != AuIO::EStreamError::eErrorNone)
{
this->bErrored = true;
}
this->buffer2.readPtr += uWritten;
uBytesProcessed = this->buffer.calcDifferenceBetweenHeadsUnsigned(this->buffer.readPtr, pRead);
}
}
catch (...)
{
SysPushErrorCatch();
}
this->buffer.readPtr += uBytesProcessed;
uBytesProcessedTotal += uBytesProcessed;
}
while (AuExchange(uBytesProcessedTotal, 0));
return uBytesProcessedTotal;
}
AuUInt32 IOBufferedProcessor::GetRawBytesBuffered()
{
return this->buffer.RemainingBytes();
}
AuUInt32 IOBufferedProcessor::GetRawBytesLimit()
{
return this->uBufferSize;
}
AUKN_SYM AuSPtr<IIOBufferedProcessor> NewBufferedProcessor(const AuSPtr<IStreamReader> &pSource,
const AuSPtr<IIOPipeInterceptor> &pProcessor,
const AuSPtr<IStreamWriter> &pDrain,
AuUInt32 uBufferSize)
{
SysCheckArgNotNull(pProcessor, {});
SysCheckArgNotNull(pDrain, {});
SysCheckArgNotNull(pSource, {});
return AuMakeShared<IOBufferedProcessor>(pSource, pDrain, pProcessor, uBufferSize);
}
AUKN_SYM AuSPtr<IIOBufferedProcessor> NewBufferedProcessorEx(const AuSPtr<IStreamReader> &pSource,
const AuSPtr<Protocol::IProtocolInterceptorEx> &pProcessor,
const AuSPtr<IStreamWriter> &pDrain,
AuUInt32 uBufferSize)
{
SysCheckArgNotNull(pProcessor, {});
SysCheckArgNotNull(pDrain, {});
SysCheckArgNotNull(pSource, {});
return AuMakeShared<IOBufferedProcessor>(pSource, pDrain, pProcessor, uBufferSize);
}
}

View File

@ -7,7 +7,7 @@
***/ ***/
#pragma once #pragma once
namespace Aurora::IO namespace Aurora::IO::Utility
{ {
} }