From 77c74f886b076446715fdc37a4ee2e1d22010497 Mon Sep 17 00:00:00 2001 From: Reece Date: Sat, 28 May 2022 12:18:00 +0100 Subject: [PATCH] [*] Update IStreamReader/Writer interfaces to be backed by an Aurora Interface [+] Aurora::IO::WriteAll [*] Improve legacy compression API (pt 2) --- Include/Aurora/IO/Buffered/BlobReader.hpp | 2 +- .../Aurora/IO/Buffered/BlobSeekableReader.hpp | 2 +- Include/Aurora/IO/Buffered/BlobWriter.hpp | 2 +- Include/Aurora/IO/FS/FileReader.hpp | 2 +- Include/Aurora/IO/FS/FileSeekableReader.hpp | 2 +- Include/Aurora/IO/FS/FileWriter.hpp | 2 +- Include/Aurora/IO/ISeekingReader.hpp | 11 +- Include/Aurora/IO/IStreamReader.hpp | 16 +- Include/Aurora/IO/IStreamReader.inl | 4 +- Include/Aurora/IO/IStreamWriter.hpp | 19 +- Include/Aurora/IO/IStreamWriter.inl | 50 +++++ Include/Aurora/Memory/MemoryView.hpp | 2 +- Source/Compression/StreamCompression.cpp | 171 ++++++++++++------ 13 files changed, 200 insertions(+), 85 deletions(-) create mode 100644 Include/Aurora/IO/IStreamWriter.inl diff --git a/Include/Aurora/IO/Buffered/BlobReader.hpp b/Include/Aurora/IO/Buffered/BlobReader.hpp index c0fa7ad2..42629444 100644 --- a/Include/Aurora/IO/Buffered/BlobReader.hpp +++ b/Include/Aurora/IO/Buffered/BlobReader.hpp @@ -20,7 +20,7 @@ namespace Aurora::IO::Buffered inline BlobReader() {} inline ~BlobReader() {} - inline virtual EStreamError Open() override + inline virtual EStreamError IsOpen() override { if (!buffer_) return EStreamError::eErrorStreamNotOpen; return EStreamError::eErrorNone; diff --git a/Include/Aurora/IO/Buffered/BlobSeekableReader.hpp b/Include/Aurora/IO/Buffered/BlobSeekableReader.hpp index 4a1545fb..ea9925ee 100644 --- a/Include/Aurora/IO/Buffered/BlobSeekableReader.hpp +++ b/Include/Aurora/IO/Buffered/BlobSeekableReader.hpp @@ -20,7 +20,7 @@ namespace Aurora::IO::Buffered inline BlobSeekableReader() {} inline ~BlobSeekableReader(){} - inline virtual EStreamError Open() override + inline virtual EStreamError IsOpen() override { if (!buffer_) return EStreamError::eErrorStreamNotOpen; return EStreamError::eErrorNone; diff --git a/Include/Aurora/IO/Buffered/BlobWriter.hpp b/Include/Aurora/IO/Buffered/BlobWriter.hpp index 83508234..13282a76 100644 --- a/Include/Aurora/IO/Buffered/BlobWriter.hpp +++ b/Include/Aurora/IO/Buffered/BlobWriter.hpp @@ -17,7 +17,7 @@ namespace Aurora::IO::Buffered inline BlobWriter() : buffer_(AuMakeShared()) {} inline ~BlobWriter() {} - inline virtual EStreamError Open() override + inline virtual EStreamError IsOpen() override { return buffer_ ? EStreamError::eErrorNone : EStreamError::eErrorStreamNotOpen; } diff --git a/Include/Aurora/IO/FS/FileReader.hpp b/Include/Aurora/IO/FS/FileReader.hpp index 5bc1faf2..fa750a28 100644 --- a/Include/Aurora/IO/FS/FileReader.hpp +++ b/Include/Aurora/IO/FS/FileReader.hpp @@ -23,7 +23,7 @@ namespace Aurora::IO::FS return stream_ != nullptr; } - inline virtual EStreamError Open() override + inline virtual EStreamError IsOpen() override { return stream_ ? EStreamError::eErrorNone : EStreamError::eErrorStreamNotOpen; } diff --git a/Include/Aurora/IO/FS/FileSeekableReader.hpp b/Include/Aurora/IO/FS/FileSeekableReader.hpp index 9cec0432..29fc012d 100644 --- a/Include/Aurora/IO/FS/FileSeekableReader.hpp +++ b/Include/Aurora/IO/FS/FileSeekableReader.hpp @@ -23,7 +23,7 @@ namespace Aurora::IO::FS return stream_ != nullptr; } - virtual EStreamError Open() override + virtual EStreamError IsOpen() override { return stream_ ? EStreamError::eErrorNone : EStreamError::eErrorStreamInterrupted; } diff --git a/Include/Aurora/IO/FS/FileWriter.hpp b/Include/Aurora/IO/FS/FileWriter.hpp index ebb84fba..03118a22 100644 --- a/Include/Aurora/IO/FS/FileWriter.hpp +++ b/Include/Aurora/IO/FS/FileWriter.hpp @@ -23,7 +23,7 @@ namespace Aurora::IO::FS return stream_ != nullptr; } - inline virtual EStreamError Open() override + inline virtual EStreamError IsOpen() override { return stream_ ? EStreamError::eErrorNone : EStreamError::eErrorStreamNotOpen; } diff --git a/Include/Aurora/IO/ISeekingReader.hpp b/Include/Aurora/IO/ISeekingReader.hpp index 6c357379..c735f99f 100644 --- a/Include/Aurora/IO/ISeekingReader.hpp +++ b/Include/Aurora/IO/ISeekingReader.hpp @@ -9,10 +9,9 @@ namespace Aurora::IO { - struct ISeekingReader - { - virtual EStreamError Open() = 0; - virtual EStreamError ArbitraryRead(AuUInt offset, const Memory::MemoryViewStreamWrite ¶mters) = 0; - virtual void Close() = 0; - }; + AUKN_INTERFACE(ISeekingReader, + AUI_METHOD(EStreamError, IsOpen, ()), + AUI_METHOD(EStreamError, ArbitraryRead, (AuUInt, offset, const Memory::MemoryViewStreamWrite&, paramters)), + AUI_METHOD(void, Close, ()) + ); } \ No newline at end of file diff --git a/Include/Aurora/IO/IStreamReader.hpp b/Include/Aurora/IO/IStreamReader.hpp index 7b7897f5..5054475d 100644 --- a/Include/Aurora/IO/IStreamReader.hpp +++ b/Include/Aurora/IO/IStreamReader.hpp @@ -9,17 +9,13 @@ namespace Aurora::IO { - // TODO (Reece): Consider publishing the multiple inverted ext hack for Aurora Interfaces for binding this. - // It would be nice to have cs-style extensions on this interface. Maybe I shouldn't double down - // over a double blocking API. Maybe I should focus on the stream pumpers. - struct IStreamReader - { - virtual EStreamError Open() = 0; - virtual EStreamError Read(const Memory::MemoryViewStreamWrite ¶mters) = 0; - virtual void Close() = 0; + AUKN_INTERFACE(IStreamReader, + AUI_METHOD(EStreamError, IsOpen, ()), + AUI_METHOD(EStreamError, Read, (const Memory::MemoryViewStreamWrite&, paramters)), + AUI_METHOD(void, Close, ()) + ); - inline EStreamError ReadAll(Memory::ByteBuffer &buffer); - }; + inline EStreamError ReadAll(IStreamReader* that, Memory::ByteBuffer& buffer); } #include "IStreamReader.inl" // ReadAll utility \ No newline at end of file diff --git a/Include/Aurora/IO/IStreamReader.inl b/Include/Aurora/IO/IStreamReader.inl index 329a59aa..b984be37 100644 --- a/Include/Aurora/IO/IStreamReader.inl +++ b/Include/Aurora/IO/IStreamReader.inl @@ -9,7 +9,7 @@ namespace Aurora::IO { - EStreamError IStreamReader::ReadAll(Memory::ByteBuffer &buffer) + inline EStreamError ReadAll(IStreamReader *that, Memory::ByteBuffer &buffer) { static const int kBufferSize = 2048; @@ -22,7 +22,7 @@ namespace Aurora::IO buffer = Memory::NewResizableBuffer(); - while ((ret = Read(Memory::MemoryViewStreamWrite(temp, len))) == EStreamError::eErrorNone) + while ((ret = that->Read(Memory::MemoryViewStreamWrite(temp, len))) == EStreamError::eErrorNone) { if (len == 0) { diff --git a/Include/Aurora/IO/IStreamWriter.hpp b/Include/Aurora/IO/IStreamWriter.hpp index d0caaeb1..e5c4adb1 100644 --- a/Include/Aurora/IO/IStreamWriter.hpp +++ b/Include/Aurora/IO/IStreamWriter.hpp @@ -9,11 +9,14 @@ namespace Aurora::IO { - struct IStreamWriter - { - virtual EStreamError Open() = 0; - virtual EStreamError Write(const Memory::MemoryViewStreamRead ¶meters) = 0; - virtual void Flush() = 0; - virtual void Close() = 0; - }; -} \ No newline at end of file + AUKN_INTERFACE(IStreamWriter, + AUI_METHOD(EStreamError, IsOpen, ()), + AUI_METHOD(EStreamError, Write, (const Memory::MemoryViewStreamRead&, parameters)), + AUI_METHOD(void, Flush, ()), + AUI_METHOD(void, Close, ()) + ); + + inline EStreamError WriteAll(IStreamWriter* that, const Memory::MemoryViewStreamRead& parameters); +} + +#include "IStreamWriter.inl" \ No newline at end of file diff --git a/Include/Aurora/IO/IStreamWriter.inl b/Include/Aurora/IO/IStreamWriter.inl new file mode 100644 index 00000000..b04a876f --- /dev/null +++ b/Include/Aurora/IO/IStreamWriter.inl @@ -0,0 +1,50 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IStreamWriter.inl + Date: 2022-5-28 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + inline EStreamError WriteAll(IStreamWriter *that, const Memory::MemoryViewStreamRead ¶meters) + { + EStreamError ret = EStreamError::eErrorNone; + + AuUInt target = parameters.length; + AuUInt written = 0; + AuUInt total = 0; + + Memory::MemoryViewStreamRead copy(Memory::MemoryViewRead(parameters.ptr, target), written); + while ((target) && + (ret = that->Write(copy)) == EStreamError::eErrorNone) + { + if (written == 0 && target) + { + parameters.outVariable = written; + return EStreamError::eErrorEndOfStream; + } + + total += written; + if (total > parameters.length) + { + // !!! + parameters.outVariable = written; + return EStreamError::eErrorEndOfStream; + } + + target -= written; + + copy.ptr = copy.Begin() + written; + copy.length -= written; + + written = 0; + } + + parameters.outVariable = total; + + return ret; + } +} \ No newline at end of file diff --git a/Include/Aurora/Memory/MemoryView.hpp b/Include/Aurora/Memory/MemoryView.hpp index b0b16fd4..cfa08f63 100644 --- a/Include/Aurora/Memory/MemoryView.hpp +++ b/Include/Aurora/Memory/MemoryView.hpp @@ -108,7 +108,7 @@ namespace Aurora::Memory } template - T *End() const + AuConditional_t End() const { return Begin() + ToCount(); } diff --git a/Source/Compression/StreamCompression.cpp b/Source/Compression/StreamCompression.cpp index 67b82ae2..46bc6dd1 100644 --- a/Source/Compression/StreamCompression.cpp +++ b/Source/Compression/StreamCompression.cpp @@ -61,10 +61,15 @@ namespace Aurora::Compression return false; } + AuUInt outputStat{}, inputStat{}; AuUInt read = buffer.size(); - while ((info.inPipe->Read(AuMemory::MemoryViewStreamWrite(buffer.data(), length)) == AuIO::EStreamError::eErrorNone) && length) + while ((info.inPipe->Read(AuMemory::MemoryViewStreamWrite(buffer.data(), read)) == AuIO::EStreamError::eErrorNone) && read) { + inputStat += read; + ZSTD_inBuffer input = { buffer.data(), read, 0 }; + read = buffer.size(); + while (input.pos < input.size) { ZSTD_outBuffer output = { inflatedBuffer.data(), outFrameLength, 0 }; @@ -76,13 +81,22 @@ namespace Aurora::Compression return false; } - auto copy = output.pos; - info.writePipe->Write(AuMemory::MemoryViewStreamRead(output.dst, copy)); - SysAssert(copy == output.pos); + { + AuUInt copy = output.pos; + auto err = AuIO::WriteAll(info.writePipe.get(), AuMemory::MemoryViewStreamRead(output.dst, copy)); + outputStat += output.pos; + + if (copy != output.pos) + { + SysPushErrorIO(); + return false; + } + } + if (info.reportProgress) { - if (!info.reportProgress(input.pos, output.pos)) + if (!info.reportProgress(inputStat, outputStat)) { ZSTD_freeDCtx(dctx); return false; @@ -161,14 +175,15 @@ namespace Aurora::Compression bool status = true; size_t const toRead = buffInSize; - + AuUInt outputStat {}, inputStat {}; while (true) { AuUInt read = buffInSize; - if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(inflatedBuffer.data(), read)) != AuIO::EStreamError::eErrorNone || !read) + if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(inflatedBuffer.data(), read)) != AuIO::EStreamError::eErrorNone) { break; } + inputStat += read; /* Select the flush mode. * If the read may not be finished (read == toRead) we use @@ -198,13 +213,21 @@ namespace Aurora::Compression return false; } - auto copy = output.pos; - stream.writePipe->Write(AuMemory::MemoryViewStreamRead(output.dst, copy)); - SysAssert(copy == output.pos); + { + AuUInt copy = output.pos; + auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(output.dst, copy)); + outputStat += output.pos; + + if (copy != output.pos) + { + SysPushErrorIO(); + return false; + } + } if (stream.reportProgress) { - if (!stream.reportProgress(input.pos, output.pos)) + if (!stream.reportProgress(inputStat, outputStat)) { ZSTD_freeCCtx(cctx); return false; @@ -256,7 +279,9 @@ namespace Aurora::Compression do { AuUInt read = AuArraySize(in); - if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone || !read) + AuIO::EStreamError error; + if (((error = stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read))) != AuIO::EStreamError::eErrorNone) && + (error != AuIO::EStreamError::eErrorEndOfStream)) { break; } @@ -265,7 +290,7 @@ namespace Aurora::Compression inputStat += read; - flush = !strm.avail_in ? Z_FINISH : Z_NO_FLUSH; + flush = (!strm.avail_in || error == AuIO::EStreamError::eErrorEndOfStream) ? Z_FINISH : Z_NO_FLUSH; strm.next_in = in; do @@ -282,11 +307,18 @@ namespace Aurora::Compression auto have = AuArraySize(out) - strm.avail_out; - AuUInt copy = have; - stream.writePipe->Write(AuMemory::MemoryViewStreamRead(out, copy)); - SysAssert(copy == have); + { + AuUInt copy = have; + auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(out, copy)); + outputStat += copy; + + if (copy != have) + { + SysPushErrorIO(); + return false; + } + } - outputStat += have; if (stream.reportProgress) { if (!stream.reportProgress(inputStat, outputStat)) @@ -334,7 +366,7 @@ namespace Aurora::Compression do { AuUInt read = AuArraySize(in); - if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone || !read) + if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone) { break; } @@ -363,12 +395,18 @@ namespace Aurora::Compression auto have = AuArraySize(out) - strm.avail_out; - AuUInt copy = have; - stream.writePipe->Write(AuMemory::MemoryViewStreamRead(out, copy)); - SysAssert(copy == have); + { + AuUInt copy = have; + auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(out, copy)); + outputStat += copy; + if (copy != have) + { + SysPushErrorIO(); + return false; + } + } - outputStat += have; if (stream.reportProgress) { if (!stream.reportProgress(inputStat, outputStat)) @@ -416,17 +454,18 @@ namespace Aurora::Compression do { - AuUInt read = AuArraySize(in); - if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone || !read) + AuIO::EStreamError error; + AuUInt read{ kChunkSize }; + if (((error = stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read))) != AuIO::EStreamError::eErrorNone) && + (error != AuIO::EStreamError::eErrorEndOfStream)) { break; } strm.avail_in = read; - inputStat += read; - flush = !strm.avail_in ? BZ_FINISH : BZ_RUN; + flush = (!strm.avail_in || error == AuIO::EStreamError::eErrorEndOfStream) ? BZ_FINISH : BZ_RUN; strm.next_in = in; do @@ -444,12 +483,17 @@ namespace Aurora::Compression auto have = AuArraySize(out) - strm.avail_out; - AuUInt copy = have; - stream.writePipe->Write(AuMemory::MemoryViewStreamRead(out, copy)); - SysAssert(copy == have); + { + AuUInt copy = have; + auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(out, copy)); + outputStat += copy; - - outputStat += have; + if (copy != have) + { + SysPushErrorIO(); + return false; + } + } if (stream.reportProgress) { @@ -502,7 +546,7 @@ namespace Aurora::Compression do { AuUInt read = AuArraySize(in); - if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone || !read) + if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone) { break; } @@ -531,11 +575,18 @@ namespace Aurora::Compression auto have = AuArraySize(out) - strm.avail_out; - AuUInt copy = have; - stream.writePipe->Write(AuMemory::MemoryViewStreamRead(out, copy)); - SysAssert(copy == have); + { + AuUInt copy = have; + auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(out, copy)); + outputStat += copy; + + if (copy != have) + { + SysPushErrorIO(); + return false; + } + } - outputStat += have; if (stream.reportProgress) { if (!stream.reportProgress(inputStat, outputStat)) @@ -592,14 +643,17 @@ namespace Aurora::Compression while (true) { - AuUInt read = maxFrameSize; - if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(buffer.get(), read)) != AuIO::EStreamError::eErrorNone || !read) + AuIO::EStreamError error; + if (((error = stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(buffer.get(), read))) != AuIO::EStreamError::eErrorNone) && + (error != AuIO::EStreamError::eErrorEndOfStream)) { break; } - if (read) + bool flush = (!read || error == AuIO::EStreamError::eErrorEndOfStream); + + if (!flush) { AuUInt32 bufferedBytes = LZ4F_compressUpdate(cctxPtr, outBuffer.get(), maxOut, buffer.get(), read, &options); @@ -609,21 +663,30 @@ namespace Aurora::Compression break; } - AuUInt copy = bufferedBytes; - stream.writePipe->Write(AuMemory::MemoryViewStreamRead(outBuffer.get(), copy)); - SysAssert(copy == bufferedBytes); + auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(outBuffer.get(), copy)); + outputStat += copy; + + if (copy != bufferedBytes) + { + SysPushErrorIO(); + return false; + } - outputStat += bufferedBytes; } else { AuUInt32 bufferedBytes = LZ4F_compressEnd(cctxPtr, outBuffer.get(), maxOut, &options); - AuUInt copy = bufferedBytes; - stream.writePipe->Write(AuMemory::MemoryViewStreamRead(outBuffer.get(), copy)); - SysAssert(copy == bufferedBytes); + auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(outBuffer.get(), copy)); + outputStat += copy; + + if (copy != bufferedBytes) + { + SysPushErrorIO(); + return false; + } break; } @@ -688,7 +751,7 @@ namespace Aurora::Compression { AuUInt read = frameSize; - if (pipe.inPipe->Read(AuMemory::MemoryViewStreamWrite(bufferIn.get(), read)) != AuIO::EStreamError::eErrorNone || !read) + if (pipe.inPipe->Read(AuMemory::MemoryViewStreamWrite(bufferIn.get(), read)) != AuIO::EStreamError::eErrorNone) { break; } @@ -718,13 +781,17 @@ namespace Aurora::Compression if (frameS2Ptr) { AuUInt copy = frameS2Ptr; - pipe.writePipe->Write(AuMemory::MemoryViewStreamRead(bufferOut.get(), copy)); - SysAssert(copy == frameS2Ptr); + auto err = AuIO::WriteAll(pipe.writePipe.get(), AuMemory::MemoryViewStreamRead(bufferOut.get(), copy)); + outputStat += copy; + + if (copy != frameS2Ptr) + { + SysPushErrorIO(); + return false; + } + } - outputStat += frameS2Ptr; - - if (pipe.reportProgress) { if (!pipe.reportProgress(inputStat, outputStat))