[*] Update IStreamReader/Writer interfaces to be backed by an Aurora Interface

[+] Aurora::IO::WriteAll
[*] Improve legacy compression API (pt 2)
This commit is contained in:
Reece Wilson 2022-05-28 12:18:00 +01:00
parent 25a593cac5
commit 77c74f886b
13 changed files with 200 additions and 85 deletions

View File

@ -20,7 +20,7 @@ namespace Aurora::IO::Buffered
inline BlobReader() {} inline BlobReader() {}
inline ~BlobReader() {} inline ~BlobReader() {}
inline virtual EStreamError Open() override inline virtual EStreamError IsOpen() override
{ {
if (!buffer_) return EStreamError::eErrorStreamNotOpen; if (!buffer_) return EStreamError::eErrorStreamNotOpen;
return EStreamError::eErrorNone; return EStreamError::eErrorNone;

View File

@ -20,7 +20,7 @@ namespace Aurora::IO::Buffered
inline BlobSeekableReader() {} inline BlobSeekableReader() {}
inline ~BlobSeekableReader(){} inline ~BlobSeekableReader(){}
inline virtual EStreamError Open() override inline virtual EStreamError IsOpen() override
{ {
if (!buffer_) return EStreamError::eErrorStreamNotOpen; if (!buffer_) return EStreamError::eErrorStreamNotOpen;
return EStreamError::eErrorNone; return EStreamError::eErrorNone;

View File

@ -17,7 +17,7 @@ namespace Aurora::IO::Buffered
inline BlobWriter() : buffer_(AuMakeShared<Memory::ByteBuffer>()) {} inline BlobWriter() : buffer_(AuMakeShared<Memory::ByteBuffer>()) {}
inline ~BlobWriter() {} inline ~BlobWriter() {}
inline virtual EStreamError Open() override inline virtual EStreamError IsOpen() override
{ {
return buffer_ ? EStreamError::eErrorNone : EStreamError::eErrorStreamNotOpen; return buffer_ ? EStreamError::eErrorNone : EStreamError::eErrorStreamNotOpen;
} }

View File

@ -23,7 +23,7 @@ namespace Aurora::IO::FS
return stream_ != nullptr; return stream_ != nullptr;
} }
inline virtual EStreamError Open() override inline virtual EStreamError IsOpen() override
{ {
return stream_ ? EStreamError::eErrorNone : EStreamError::eErrorStreamNotOpen; return stream_ ? EStreamError::eErrorNone : EStreamError::eErrorStreamNotOpen;
} }

View File

@ -23,7 +23,7 @@ namespace Aurora::IO::FS
return stream_ != nullptr; return stream_ != nullptr;
} }
virtual EStreamError Open() override virtual EStreamError IsOpen() override
{ {
return stream_ ? EStreamError::eErrorNone : EStreamError::eErrorStreamInterrupted; return stream_ ? EStreamError::eErrorNone : EStreamError::eErrorStreamInterrupted;
} }

View File

@ -23,7 +23,7 @@ namespace Aurora::IO::FS
return stream_ != nullptr; return stream_ != nullptr;
} }
inline virtual EStreamError Open() override inline virtual EStreamError IsOpen() override
{ {
return stream_ ? EStreamError::eErrorNone : EStreamError::eErrorStreamNotOpen; return stream_ ? EStreamError::eErrorNone : EStreamError::eErrorStreamNotOpen;
} }

View File

@ -9,10 +9,9 @@
namespace Aurora::IO namespace Aurora::IO
{ {
struct ISeekingReader AUKN_INTERFACE(ISeekingReader,
{ AUI_METHOD(EStreamError, IsOpen, ()),
virtual EStreamError Open() = 0; AUI_METHOD(EStreamError, ArbitraryRead, (AuUInt, offset, const Memory::MemoryViewStreamWrite&, paramters)),
virtual EStreamError ArbitraryRead(AuUInt offset, const Memory::MemoryViewStreamWrite &paramters) = 0; AUI_METHOD(void, Close, ())
virtual void Close() = 0; );
};
} }

View File

@ -9,17 +9,13 @@
namespace Aurora::IO namespace Aurora::IO
{ {
// TODO (Reece): Consider publishing the multiple inverted ext hack for Aurora Interfaces for binding this. AUKN_INTERFACE(IStreamReader,
// It would be nice to have cs-style extensions on this interface. Maybe I shouldn't double down AUI_METHOD(EStreamError, IsOpen, ()),
// over a double blocking API. Maybe I should focus on the stream pumpers. AUI_METHOD(EStreamError, Read, (const Memory::MemoryViewStreamWrite&, paramters)),
struct IStreamReader AUI_METHOD(void, Close, ())
{ );
virtual EStreamError Open() = 0;
virtual EStreamError Read(const Memory::MemoryViewStreamWrite &paramters) = 0;
virtual void Close() = 0;
inline EStreamError ReadAll(Memory::ByteBuffer &buffer); inline EStreamError ReadAll(IStreamReader* that, Memory::ByteBuffer& buffer);
};
} }
#include "IStreamReader.inl" // ReadAll utility #include "IStreamReader.inl" // ReadAll utility

View File

@ -9,7 +9,7 @@
namespace Aurora::IO namespace Aurora::IO
{ {
EStreamError IStreamReader::ReadAll(Memory::ByteBuffer &buffer) inline EStreamError ReadAll(IStreamReader *that, Memory::ByteBuffer &buffer)
{ {
static const int kBufferSize = 2048; static const int kBufferSize = 2048;
@ -22,7 +22,7 @@ namespace Aurora::IO
buffer = Memory::NewResizableBuffer(); buffer = Memory::NewResizableBuffer();
while ((ret = Read(Memory::MemoryViewStreamWrite(temp, len))) == EStreamError::eErrorNone) while ((ret = that->Read(Memory::MemoryViewStreamWrite(temp, len))) == EStreamError::eErrorNone)
{ {
if (len == 0) if (len == 0)
{ {

View File

@ -9,11 +9,14 @@
namespace Aurora::IO namespace Aurora::IO
{ {
struct IStreamWriter AUKN_INTERFACE(IStreamWriter,
{ AUI_METHOD(EStreamError, IsOpen, ()),
virtual EStreamError Open() = 0; AUI_METHOD(EStreamError, Write, (const Memory::MemoryViewStreamRead&, parameters)),
virtual EStreamError Write(const Memory::MemoryViewStreamRead &parameters) = 0; AUI_METHOD(void, Flush, ()),
virtual void Flush() = 0; AUI_METHOD(void, Close, ())
virtual void Close() = 0; );
};
inline EStreamError WriteAll(IStreamWriter* that, const Memory::MemoryViewStreamRead& parameters);
} }
#include "IStreamWriter.inl"

View File

@ -0,0 +1,50 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IStreamWriter.inl
Date: 2022-5-28
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
inline EStreamError WriteAll(IStreamWriter *that, const Memory::MemoryViewStreamRead &parameters)
{
EStreamError ret = EStreamError::eErrorNone;
AuUInt target = parameters.length;
AuUInt written = 0;
AuUInt total = 0;
Memory::MemoryViewStreamRead copy(Memory::MemoryViewRead(parameters.ptr, target), written);
while ((target) &&
(ret = that->Write(copy)) == EStreamError::eErrorNone)
{
if (written == 0 && target)
{
parameters.outVariable = written;
return EStreamError::eErrorEndOfStream;
}
total += written;
if (total > parameters.length)
{
// !!!
parameters.outVariable = written;
return EStreamError::eErrorEndOfStream;
}
target -= written;
copy.ptr = copy.Begin<char>() + written;
copy.length -= written;
written = 0;
}
parameters.outVariable = total;
return ret;
}
}

View File

@ -108,7 +108,7 @@ namespace Aurora::Memory
} }
template<typename T> template<typename T>
T *End() const AuConditional_t<Readonly_b, const T*, T*> End() const
{ {
return Begin<T>() + ToCount<T>(); return Begin<T>() + ToCount<T>();
} }

View File

@ -61,10 +61,15 @@ namespace Aurora::Compression
return false; return false;
} }
AuUInt outputStat{}, inputStat{};
AuUInt read = buffer.size(); AuUInt read = buffer.size();
while ((info.inPipe->Read(AuMemory::MemoryViewStreamWrite(buffer.data(), length)) == AuIO::EStreamError::eErrorNone) && length) while ((info.inPipe->Read(AuMemory::MemoryViewStreamWrite(buffer.data(), read)) == AuIO::EStreamError::eErrorNone) && read)
{ {
inputStat += read;
ZSTD_inBuffer input = { buffer.data(), read, 0 }; ZSTD_inBuffer input = { buffer.data(), read, 0 };
read = buffer.size();
while (input.pos < input.size) while (input.pos < input.size)
{ {
ZSTD_outBuffer output = { inflatedBuffer.data(), outFrameLength, 0 }; ZSTD_outBuffer output = { inflatedBuffer.data(), outFrameLength, 0 };
@ -76,13 +81,22 @@ namespace Aurora::Compression
return false; return false;
} }
auto copy = output.pos; {
info.writePipe->Write(AuMemory::MemoryViewStreamRead(output.dst, copy)); AuUInt copy = output.pos;
SysAssert(copy == output.pos); auto err = AuIO::WriteAll(info.writePipe.get(), AuMemory::MemoryViewStreamRead(output.dst, copy));
outputStat += output.pos;
if (copy != output.pos)
{
SysPushErrorIO();
return false;
}
}
if (info.reportProgress) if (info.reportProgress)
{ {
if (!info.reportProgress(input.pos, output.pos)) if (!info.reportProgress(inputStat, outputStat))
{ {
ZSTD_freeDCtx(dctx); ZSTD_freeDCtx(dctx);
return false; return false;
@ -161,14 +175,15 @@ namespace Aurora::Compression
bool status = true; bool status = true;
size_t const toRead = buffInSize; size_t const toRead = buffInSize;
AuUInt outputStat {}, inputStat {};
while (true) while (true)
{ {
AuUInt read = buffInSize; AuUInt read = buffInSize;
if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(inflatedBuffer.data(), read)) != AuIO::EStreamError::eErrorNone || !read) if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(inflatedBuffer.data(), read)) != AuIO::EStreamError::eErrorNone)
{ {
break; break;
} }
inputStat += read;
/* Select the flush mode. /* Select the flush mode.
* If the read may not be finished (read == toRead) we use * If the read may not be finished (read == toRead) we use
@ -198,13 +213,21 @@ namespace Aurora::Compression
return false; return false;
} }
auto copy = output.pos; {
stream.writePipe->Write(AuMemory::MemoryViewStreamRead(output.dst, copy)); AuUInt copy = output.pos;
SysAssert(copy == output.pos); auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(output.dst, copy));
outputStat += output.pos;
if (copy != output.pos)
{
SysPushErrorIO();
return false;
}
}
if (stream.reportProgress) if (stream.reportProgress)
{ {
if (!stream.reportProgress(input.pos, output.pos)) if (!stream.reportProgress(inputStat, outputStat))
{ {
ZSTD_freeCCtx(cctx); ZSTD_freeCCtx(cctx);
return false; return false;
@ -256,7 +279,9 @@ namespace Aurora::Compression
do do
{ {
AuUInt read = AuArraySize(in); AuUInt read = AuArraySize(in);
if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone || !read) AuIO::EStreamError error;
if (((error = stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read))) != AuIO::EStreamError::eErrorNone) &&
(error != AuIO::EStreamError::eErrorEndOfStream))
{ {
break; break;
} }
@ -265,7 +290,7 @@ namespace Aurora::Compression
inputStat += read; inputStat += read;
flush = !strm.avail_in ? Z_FINISH : Z_NO_FLUSH; flush = (!strm.avail_in || error == AuIO::EStreamError::eErrorEndOfStream) ? Z_FINISH : Z_NO_FLUSH;
strm.next_in = in; strm.next_in = in;
do do
@ -282,11 +307,18 @@ namespace Aurora::Compression
auto have = AuArraySize(out) - strm.avail_out; auto have = AuArraySize(out) - strm.avail_out;
{
AuUInt copy = have; AuUInt copy = have;
stream.writePipe->Write(AuMemory::MemoryViewStreamRead(out, copy)); auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(out, copy));
SysAssert(copy == have); outputStat += copy;
if (copy != have)
{
SysPushErrorIO();
return false;
}
}
outputStat += have;
if (stream.reportProgress) if (stream.reportProgress)
{ {
if (!stream.reportProgress(inputStat, outputStat)) if (!stream.reportProgress(inputStat, outputStat))
@ -334,7 +366,7 @@ namespace Aurora::Compression
do do
{ {
AuUInt read = AuArraySize(in); AuUInt read = AuArraySize(in);
if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone || !read) if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone)
{ {
break; break;
} }
@ -363,12 +395,18 @@ namespace Aurora::Compression
auto have = AuArraySize(out) - strm.avail_out; auto have = AuArraySize(out) - strm.avail_out;
{
AuUInt copy = have; AuUInt copy = have;
stream.writePipe->Write(AuMemory::MemoryViewStreamRead(out, copy)); auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(out, copy));
SysAssert(copy == have); outputStat += copy;
if (copy != have)
{
SysPushErrorIO();
return false;
}
}
outputStat += have;
if (stream.reportProgress) if (stream.reportProgress)
{ {
if (!stream.reportProgress(inputStat, outputStat)) if (!stream.reportProgress(inputStat, outputStat))
@ -416,17 +454,18 @@ namespace Aurora::Compression
do do
{ {
AuUInt read = AuArraySize(in); AuIO::EStreamError error;
if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone || !read) AuUInt read{ kChunkSize };
if (((error = stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read))) != AuIO::EStreamError::eErrorNone) &&
(error != AuIO::EStreamError::eErrorEndOfStream))
{ {
break; break;
} }
strm.avail_in = read; strm.avail_in = read;
inputStat += read; inputStat += read;
flush = !strm.avail_in ? BZ_FINISH : BZ_RUN; flush = (!strm.avail_in || error == AuIO::EStreamError::eErrorEndOfStream) ? BZ_FINISH : BZ_RUN;
strm.next_in = in; strm.next_in = in;
do do
@ -444,12 +483,17 @@ namespace Aurora::Compression
auto have = AuArraySize(out) - strm.avail_out; auto have = AuArraySize(out) - strm.avail_out;
{
AuUInt copy = have; AuUInt copy = have;
stream.writePipe->Write(AuMemory::MemoryViewStreamRead(out, copy)); auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(out, copy));
SysAssert(copy == have); outputStat += copy;
if (copy != have)
outputStat += have; {
SysPushErrorIO();
return false;
}
}
if (stream.reportProgress) if (stream.reportProgress)
{ {
@ -502,7 +546,7 @@ namespace Aurora::Compression
do do
{ {
AuUInt read = AuArraySize(in); AuUInt read = AuArraySize(in);
if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone || !read) if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone)
{ {
break; break;
} }
@ -531,11 +575,18 @@ namespace Aurora::Compression
auto have = AuArraySize(out) - strm.avail_out; auto have = AuArraySize(out) - strm.avail_out;
{
AuUInt copy = have; AuUInt copy = have;
stream.writePipe->Write(AuMemory::MemoryViewStreamRead(out, copy)); auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(out, copy));
SysAssert(copy == have); outputStat += copy;
if (copy != have)
{
SysPushErrorIO();
return false;
}
}
outputStat += have;
if (stream.reportProgress) if (stream.reportProgress)
{ {
if (!stream.reportProgress(inputStat, outputStat)) if (!stream.reportProgress(inputStat, outputStat))
@ -592,14 +643,17 @@ namespace Aurora::Compression
while (true) while (true)
{ {
AuUInt read = maxFrameSize; AuUInt read = maxFrameSize;
if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(buffer.get(), read)) != AuIO::EStreamError::eErrorNone || !read) AuIO::EStreamError error;
if (((error = stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(buffer.get(), read))) != AuIO::EStreamError::eErrorNone) &&
(error != AuIO::EStreamError::eErrorEndOfStream))
{ {
break; break;
} }
if (read) bool flush = (!read || error == AuIO::EStreamError::eErrorEndOfStream);
if (!flush)
{ {
AuUInt32 bufferedBytes = LZ4F_compressUpdate(cctxPtr, outBuffer.get(), maxOut, buffer.get(), read, &options); AuUInt32 bufferedBytes = LZ4F_compressUpdate(cctxPtr, outBuffer.get(), maxOut, buffer.get(), read, &options);
@ -609,21 +663,30 @@ namespace Aurora::Compression
break; break;
} }
AuUInt copy = bufferedBytes; AuUInt copy = bufferedBytes;
stream.writePipe->Write(AuMemory::MemoryViewStreamRead(outBuffer.get(), copy)); auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(outBuffer.get(), copy));
SysAssert(copy == bufferedBytes); outputStat += copy;
if (copy != bufferedBytes)
{
SysPushErrorIO();
return false;
}
outputStat += bufferedBytes;
} }
else else
{ {
AuUInt32 bufferedBytes = LZ4F_compressEnd(cctxPtr, outBuffer.get(), maxOut, &options); AuUInt32 bufferedBytes = LZ4F_compressEnd(cctxPtr, outBuffer.get(), maxOut, &options);
AuUInt copy = bufferedBytes; AuUInt copy = bufferedBytes;
stream.writePipe->Write(AuMemory::MemoryViewStreamRead(outBuffer.get(), copy)); auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(outBuffer.get(), copy));
SysAssert(copy == bufferedBytes); outputStat += copy;
if (copy != bufferedBytes)
{
SysPushErrorIO();
return false;
}
break; break;
} }
@ -688,7 +751,7 @@ namespace Aurora::Compression
{ {
AuUInt read = frameSize; AuUInt read = frameSize;
if (pipe.inPipe->Read(AuMemory::MemoryViewStreamWrite(bufferIn.get(), read)) != AuIO::EStreamError::eErrorNone || !read) if (pipe.inPipe->Read(AuMemory::MemoryViewStreamWrite(bufferIn.get(), read)) != AuIO::EStreamError::eErrorNone)
{ {
break; break;
} }
@ -718,12 +781,16 @@ namespace Aurora::Compression
if (frameS2Ptr) if (frameS2Ptr)
{ {
AuUInt copy = frameS2Ptr; AuUInt copy = frameS2Ptr;
pipe.writePipe->Write(AuMemory::MemoryViewStreamRead(bufferOut.get(), copy)); auto err = AuIO::WriteAll(pipe.writePipe.get(), AuMemory::MemoryViewStreamRead(bufferOut.get(), copy));
SysAssert(copy == frameS2Ptr); outputStat += copy;
if (copy != frameS2Ptr)
{
SysPushErrorIO();
return false;
} }
outputStat += frameS2Ptr; }
if (pipe.reportProgress) if (pipe.reportProgress)
{ {