diff --git a/Include/Aurora/IO/IOExperimental.hpp b/Include/Aurora/IO/IOExperimental.hpp index dfd1984e..b90e7f77 100644 --- a/Include/Aurora/IO/IOExperimental.hpp +++ b/Include/Aurora/IO/IOExperimental.hpp @@ -42,7 +42,7 @@ //#include "IBufferedReadProcessorToReader.hpp" #include "IIOPipeInterceptor.hpp" -#include "IIOBufferedProcessor.hpp" +#include "Utility/IIOBufferedProcessor.hpp" #include "Utility/IByteBufferStreamPair.hpp" diff --git a/Include/Aurora/IO/IIOBufferedProcessor.hpp b/Include/Aurora/IO/Utility/IIOBufferedProcessor.hpp similarity index 61% rename from Include/Aurora/IO/IIOBufferedProcessor.hpp rename to Include/Aurora/IO/Utility/IIOBufferedProcessor.hpp index 77615418..5dfdd766 100644 --- a/Include/Aurora/IO/IIOBufferedProcessor.hpp +++ b/Include/Aurora/IO/Utility/IIOBufferedProcessor.hpp @@ -7,7 +7,9 @@ ***/ #pragma once -namespace Aurora::IO +#include "../Protocol/IProtocolInterceptorEx.hpp" + +namespace Aurora::IO::Utility { struct IIOBufferedProcessor { @@ -22,11 +24,15 @@ namespace Aurora::IO * @param processor * @param drain * @param bufferSize - * @deprecated by Aurora::IO::Protocol (tobe AuProtocol:: and AuIOProtocol::) * @return */ AUKN_SYM AuSPtr NewBufferedProcessor(const AuSPtr &pSource, const AuSPtr &pProcessor, - const AuSPtr &pDrain, // TODO: share IOPipeCallback + const AuSPtr &pDrain, AuUInt32 uBufferSize); + + AUKN_SYM AuSPtr NewBufferedProcessorEx(const AuSPtr &pSource, + const AuSPtr &pProcessor, + const AuSPtr &pDrain, + AuUInt32 uBufferSize); } \ No newline at end of file diff --git a/Include/Aurora/Memory/ByteBuffer.hpp b/Include/Aurora/Memory/ByteBuffer.hpp index 13f40738..76240027 100644 --- a/Include/Aurora/Memory/ByteBuffer.hpp +++ b/Include/Aurora/Memory/ByteBuffer.hpp @@ -527,6 +527,32 @@ namespace Aurora::Memory template bool ReadTagged(T &out); + + // + + inline AuUInt calcDifferenceBetweenHeadsUnsigned(AuUInt8 *pHeadPointer, AuUInt8 *pSubtrahend) + { + if (pHeadPointer > pSubtrahend) + { + return pHeadPointer - pSubtrahend; + } + else + { + return (pSubtrahend - this->base) + ((this->base + this->length) - pHeadPointer); + } + } + + inline AuSInt calcDifferenceBetweenHeadsSigned(AuUInt8 *pHeadPointer, AuUInt8 *pSubtrahend) + { + if (pHeadPointer > pSubtrahend) + { + return pHeadPointer - pSubtrahend; + } + else + { + return -(pSubtrahend - this->base); + } + } }; static ByteBuffer NewResizableBuffer(AuUInt32 length = 0) diff --git a/Source/IO/AuIOBufferedProcessor.cpp b/Source/IO/AuIOBufferedProcessor.cpp deleted file mode 100644 index cb58c1fc..00000000 --- a/Source/IO/AuIOBufferedProcessor.cpp +++ /dev/null @@ -1,135 +0,0 @@ -/*** - Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. - - File: AuIOBufferedProcessor.cpp - Date: 2022-6-6 - Author: Reece -***/ -#include -#include -#include "AuIOBufferedProcessor.hpp" - -namespace Aurora::IO -{ - struct IOBufferedProcessor : IIOBufferedProcessor - { - AuSPtr pSource; - AuSPtr pDrain; - AuSPtr pProcessor; - AuUInt32 uBufferSize {}; - AuByteBuffer buffer; - - AU_DEFINE_FOR_VA(IOBufferedProcessor, - (AU_DEFINE_CTOR_VA, // initializer-list-like ctor (extending a struct or adding a ctor will break initializer lists) - AU_DEFINE_THIS_MOVE_CTOR_VA, // add move `Object(Object &&)` - AU_DEFINE_EQUALS_VA, // add equals operator - AU_DEFINE_MOVE_VA, // add move assignment operator - AU_DEFINE_COPY_VA), // add copy assignment operator - (pSource, pDrain, pProcessor, uBufferSize)); - - - AuUInt32 TryProcessBuffered() override; - AuUInt32 GetRawBytesBuffered() override; - AuUInt32 GetRawBytesLimit() override; - - AuUInt32 TryPump(); - }; - - AuUInt32 IOBufferedProcessor::TryProcessBuffered() - { - if (this->buffer.IsEmpty()) - { - this->buffer.Allocate(this->uBufferSize); - this->buffer.flagCircular = true; // !!! - } - - if (this->buffer.IsEmpty()) - { - return TryPump(); - } - - AuUInt canBuffer = this->buffer.RemainingWrite(); - canBuffer = AuMin(canBuffer, AuUInt((this->buffer.length + this->buffer.base) - this->buffer.writePtr)); - - AuUInt read {}; - try - { - if (this->pSource->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer.writePtr, canBuffer), read)) != - AuIO::EStreamError::eErrorNone) - { - return TryPump(); - } - } - catch (...) - { - SysPushErrorCatch(); - } - - this->buffer.writePtr += read; - - if (this->buffer.writePtr == this->buffer.base + this->buffer.length) - { - this->buffer.writePtr = this->buffer.base; - } - - return TryPump(); - } - - AuUInt32 IOBufferedProcessor::TryPump() - { - AuUInt bytesProcessedTotal {}; - AuUInt bytesProcessed {}; - - do - { - if (this->buffer.flagCircular && this->buffer.readPtr == this->buffer.base + this->buffer.length) - { - this->buffer.readPtr = this->buffer.base; - } - - AuUInt canRead = this->buffer.RemainingBytes(); - canRead = AuMin(canRead, (this->buffer.length + this->buffer.base) - this->buffer.readPtr); - - try - { - if (!this->pProcessor->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer.readPtr, canRead), bytesProcessed), this->pDrain)) - { - break; - } - } - catch (...) - { - SysPushErrorCatch(); - } - - this->buffer.readPtr += bytesProcessed; - bytesProcessedTotal += bytesProcessed; - - if (this->buffer.flagCircular && this->buffer.readPtr == this->buffer.base + this->buffer.length) - { - this->buffer.readPtr = this->buffer.base; - } - } - while (AuExchange(bytesProcessed, 0)); - - return bytesProcessedTotal; - } - - AuUInt32 IOBufferedProcessor::GetRawBytesBuffered() - { - return this->buffer.RemainingBytes(); - } - - AuUInt32 IOBufferedProcessor::GetRawBytesLimit() - { - return this->uBufferSize; - } - - AUKN_SYM AuSPtr NewBufferedProcessor(const AuSPtr &pSource, - const AuSPtr &pProcessor, - const AuSPtr &pDrain, - AuUInt32 uBufferSize) - { - return AuMakeShared(pSource, pDrain, pProcessor, uBufferSize); - } -} \ No newline at end of file diff --git a/Source/IO/Utility/AuIOBufferedProcessor.cpp b/Source/IO/Utility/AuIOBufferedProcessor.cpp new file mode 100644 index 00000000..4ce15c6e --- /dev/null +++ b/Source/IO/Utility/AuIOBufferedProcessor.cpp @@ -0,0 +1,177 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuIOBufferedProcessor.cpp + Date: 2022-6-6 + Author: Reece +***/ +#include +#include +#include "AuIOBufferedProcessor.hpp" + +namespace Aurora::IO::Utility +{ + struct IOBufferedProcessor : + IIOBufferedProcessor, + AuEnableSharedFromThis + { + AuSPtr pSource; + AuSPtr pDrain; + AuSPtr pProcessor; + AuSPtr pProcessorEx; + AuUInt32 uBufferSize {}; + AuByteBuffer buffer; + AuByteBuffer buffer2; + bool bErrored {}; + + AU_DEFINE_FOR_VA(IOBufferedProcessor, + (AU_DEFINE_CTOR_VA, // initializer-list-like ctor (extending a struct or adding a ctor will break initializer lists) + AU_DEFINE_THIS_MOVE_CTOR_VA, // add move `Object(Object &&)` + AU_DEFINE_EQUALS_VA, // add equals operator + AU_DEFINE_MOVE_VA, // add move assignment operator + AU_DEFINE_COPY_VA), // add copy assignment operator + (pSource, pDrain, pProcessor, uBufferSize)); + + IOBufferedProcessor(AuSPtr pSource, + AuSPtr pDrain, + AuSPtr pProcessor, + AuUInt32 uBufferSize) : + pSource(pSource), + pDrain(pDrain), + pProcessorEx(pProcessor), + uBufferSize(uBufferSize) + { + + } + + AuUInt32 TryProcessBuffered() override; + AuUInt32 GetRawBytesBuffered() override; + AuUInt32 GetRawBytesLimit() override; + + AuUInt32 TryPump(); + }; + + AuUInt32 IOBufferedProcessor::TryProcessBuffered() + { + if (this->buffer.IsEmpty()) + { + this->buffer.Allocate(this->uBufferSize); + this->buffer.flagCircular = true; + + if (this->pProcessorEx) + { + this->buffer2.Allocate(this->uBufferSize); + this->buffer2.flagCircular = true; + } + } + + if (!this->buffer || + (this->pProcessorEx && !this->buffer2)) + { + return TryPump(); + } + + AuUInt read {}; + try + { + if (this->pSource->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer), read)) != + AuIO::EStreamError::eErrorNone) + { + return this->TryPump(); + } + } + catch (...) + { + SysPushErrorCatch(); + } + + this->buffer.writePtr += read; + + return this->TryPump(); + } + + AuUInt32 IOBufferedProcessor::TryPump() + { + AuUInt uBytesProcessedTotal {}; + AuUInt uBytesProcessed {}; + + if (this->bErrored) + { + return 0; + } + + do + { + try + { + if (this->pProcessor) + { + if (!this->pProcessor->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer), uBytesProcessed), this->pDrain)) + { + break; + } + } + else if (this->pProcessorEx) + { + auto pRead = this->buffer.readPtr; + if (!this->pProcessorEx->OnDataAvailable(AuSharedPointerFromThis(&this->buffer), AuSharedPointerFromThis(&this->buffer2), {})) + { + this->buffer.readPtr = pRead; + break; + } + + AuUInt uWritten {}; + if (AuIO::WriteAll(this->pDrain.get(), { AuMemoryViewRead(this->buffer2), uWritten }) != AuIO::EStreamError::eErrorNone) + { + this->bErrored = true; + } + + this->buffer2.readPtr += uWritten; + uBytesProcessed = this->buffer.calcDifferenceBetweenHeadsUnsigned(this->buffer.readPtr, pRead); + } + } + catch (...) + { + SysPushErrorCatch(); + } + + this->buffer.readPtr += uBytesProcessed; + uBytesProcessedTotal += uBytesProcessed; + } + while (AuExchange(uBytesProcessedTotal, 0)); + + return uBytesProcessedTotal; + } + + AuUInt32 IOBufferedProcessor::GetRawBytesBuffered() + { + return this->buffer.RemainingBytes(); + } + + AuUInt32 IOBufferedProcessor::GetRawBytesLimit() + { + return this->uBufferSize; + } + + AUKN_SYM AuSPtr NewBufferedProcessor(const AuSPtr &pSource, + const AuSPtr &pProcessor, + const AuSPtr &pDrain, + AuUInt32 uBufferSize) + { + SysCheckArgNotNull(pProcessor, {}); + SysCheckArgNotNull(pDrain, {}); + SysCheckArgNotNull(pSource, {}); + return AuMakeShared(pSource, pDrain, pProcessor, uBufferSize); + } + + AUKN_SYM AuSPtr NewBufferedProcessorEx(const AuSPtr &pSource, + const AuSPtr &pProcessor, + const AuSPtr &pDrain, + AuUInt32 uBufferSize) + { + SysCheckArgNotNull(pProcessor, {}); + SysCheckArgNotNull(pDrain, {}); + SysCheckArgNotNull(pSource, {}); + return AuMakeShared(pSource, pDrain, pProcessor, uBufferSize); + } +} \ No newline at end of file diff --git a/Source/IO/AuIOBufferedProcessor.hpp b/Source/IO/Utility/AuIOBufferedProcessor.hpp similarity index 84% rename from Source/IO/AuIOBufferedProcessor.hpp rename to Source/IO/Utility/AuIOBufferedProcessor.hpp index c10dfdbb..5e1bd7bf 100644 --- a/Source/IO/AuIOBufferedProcessor.hpp +++ b/Source/IO/Utility/AuIOBufferedProcessor.hpp @@ -7,7 +7,7 @@ ***/ #pragma once -namespace Aurora::IO +namespace Aurora::IO::Utility { } \ No newline at end of file