[*] Polyfill legacy compression api into modern io era with IStream[reader/writers]

This commit is contained in:
Reece Wilson 2022-05-28 10:05:25 +01:00
parent 4bf581cd37
commit 25a593cac5
3 changed files with 104 additions and 23 deletions

View File

@ -72,10 +72,22 @@ namespace Aurora::Compression
AuUInt32 internalStreamSize {4096};
AuUInt8 threads {1};
inline CompressionInfo(ECompressionType alg) : type(alg)
{
}
inline CompressionInfo(ECompressionType alg, AuUInt32 bufferSize) : type(alg), internalStreamSize(bufferSize)
{
}
; };
struct DecompressInfo
{
AU_COPY_MOVE(DecompressInfo);
/**
* @brief algorithm
*/
@ -96,12 +108,12 @@ namespace Aurora::Compression
*/
AuInt8 windowBits {15};
DecompressInfo(ECompressionType alg) : alg(alg)
inline DecompressInfo(ECompressionType alg) : alg(alg)
{
}
DecompressInfo(ECompressionType alg, AuUInt32 bufferSize) : alg(alg), internalStreamSize(bufferSize)
inline DecompressInfo(ECompressionType alg, AuUInt32 bufferSize) : alg(alg), internalStreamSize(bufferSize)
{
}

View File

@ -7,6 +7,12 @@
***/
#pragma once
namespace Aurora::IO
{
struct IStreamReader;
struct IStreamWriter;
}
namespace Aurora::Compression
{
struct CompressionPipe
@ -17,10 +23,10 @@ namespace Aurora::Compression
AuUInt32 threads {1};
/// consume from stream callback
AuFunction<AuUInt(void *, AuUInt)> inPipe;
AuSPtr<Aurora::IO::IStreamReader> inPipe;
/// write to stream callback
AuFunction<void(const void *, AuUInt)> writePipe;
AuSPtr<Aurora::IO::IStreamWriter> writePipe;
/// preemption and reporting
AuFunction<bool(AuUInt, AuUInt)> reportProgress;

View File

@ -62,7 +62,7 @@ namespace Aurora::Compression
}
AuUInt read = buffer.size();
while (read = (info.inPipe(buffer.data(), length)))
while ((info.inPipe->Read(AuMemory::MemoryViewStreamWrite(buffer.data(), length)) == AuIO::EStreamError::eErrorNone) && length)
{
ZSTD_inBuffer input = { buffer.data(), read, 0 };
while (input.pos < input.size)
@ -76,7 +76,10 @@ namespace Aurora::Compression
return false;
}
info.writePipe(output.dst, output.pos);
auto copy = output.pos;
info.writePipe->Write(AuMemory::MemoryViewStreamRead(output.dst, copy));
SysAssert(copy == output.pos);
if (info.reportProgress)
{
if (!info.reportProgress(input.pos, output.pos))
@ -161,7 +164,12 @@ namespace Aurora::Compression
while (true)
{
size_t read = stream.inPipe(inflatedBuffer.data(), buffInSize);
AuUInt read = buffInSize;
if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(inflatedBuffer.data(), read)) != AuIO::EStreamError::eErrorNone || !read)
{
break;
}
/* Select the flush mode.
* If the read may not be finished (read == toRead) we use
* ZSTD_e_continue. If this is the last chunk, we use ZSTD_e_end.
@ -190,7 +198,9 @@ namespace Aurora::Compression
return false;
}
stream.writePipe(output.dst, output.pos);
auto copy = output.pos;
stream.writePipe->Write(AuMemory::MemoryViewStreamRead(output.dst, copy));
SysAssert(copy == output.pos);
if (stream.reportProgress)
{
@ -245,7 +255,12 @@ namespace Aurora::Compression
do
{
auto read = stream.inPipe(in, AuArraySize(in));
AuUInt read = AuArraySize(in);
if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone || !read)
{
break;
}
strm.avail_in = read;
inputStat += read;
@ -267,7 +282,9 @@ namespace Aurora::Compression
auto have = AuArraySize(out) - strm.avail_out;
stream.writePipe(out, have);
AuUInt copy = have;
stream.writePipe->Write(AuMemory::MemoryViewStreamRead(out, copy));
SysAssert(copy == have);
outputStat += have;
if (stream.reportProgress)
@ -316,7 +333,12 @@ namespace Aurora::Compression
do
{
auto read = stream.inPipe(in, AuArraySize(in));
AuUInt read = AuArraySize(in);
if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone || !read)
{
break;
}
inputStat += read;
if (!read)
{
@ -341,7 +363,10 @@ namespace Aurora::Compression
auto have = AuArraySize(out) - strm.avail_out;
stream.writePipe(out, have);
AuUInt copy = have;
stream.writePipe->Write(AuMemory::MemoryViewStreamRead(out, copy));
SysAssert(copy == have);
outputStat += have;
if (stream.reportProgress)
@ -391,7 +416,12 @@ namespace Aurora::Compression
do
{
auto read = stream.inPipe(in, AuArraySize(in));
AuUInt read = AuArraySize(in);
if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone || !read)
{
break;
}
strm.avail_in = read;
inputStat += read;
@ -414,7 +444,10 @@ namespace Aurora::Compression
auto have = AuArraySize(out) - strm.avail_out;
stream.writePipe(out, have);
AuUInt copy = have;
stream.writePipe->Write(AuMemory::MemoryViewStreamRead(out, copy));
SysAssert(copy == have);
outputStat += have;
@ -468,7 +501,12 @@ namespace Aurora::Compression
do
{
auto read = stream.inPipe(in, AuArraySize(in));
AuUInt read = AuArraySize(in);
if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone || !read)
{
break;
}
inputStat += read;
if (!read)
{
@ -493,7 +531,9 @@ namespace Aurora::Compression
auto have = AuArraySize(out) - strm.avail_out;
stream.writePipe(out, have);
AuUInt copy = have;
stream.writePipe->Write(AuMemory::MemoryViewStreamRead(out, copy));
SysAssert(copy == have);
outputStat += have;
if (stream.reportProgress)
@ -544,13 +584,20 @@ namespace Aurora::Compression
// Write header
{
auto written = LZ4F_compressBegin(cctxPtr, header, AuArraySize(header), &pref);
stream.writePipe(header, written);
AuUInt copy = written;
stream.writePipe->Write(AuMemory::MemoryViewStreamRead(header, copy));
SysAssert(copy == written);
}
while (true)
{
auto read = stream.inPipe(buffer.get(), maxFrameSize);
inputStat += read;
AuUInt read = maxFrameSize;
if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(buffer.get(), read)) != AuIO::EStreamError::eErrorNone || !read)
{
break;
}
if (read)
{
@ -562,14 +609,22 @@ namespace Aurora::Compression
break;
}
stream.writePipe(outBuffer.get(), bufferedBytes);
AuUInt copy = bufferedBytes;
stream.writePipe->Write(AuMemory::MemoryViewStreamRead(outBuffer.get(), copy));
SysAssert(copy == bufferedBytes);
outputStat += bufferedBytes;
}
else
{
AuUInt32 bufferedBytes = LZ4F_compressEnd(cctxPtr, outBuffer.get(), maxOut, &options);
stream.writePipe(outBuffer.get(), bufferedBytes);
AuUInt copy = bufferedBytes;
stream.writePipe->Write(AuMemory::MemoryViewStreamRead(outBuffer.get(), copy));
SysAssert(copy == bufferedBytes);
break;
}
@ -631,7 +686,13 @@ namespace Aurora::Compression
if (frameSize)
{
auto read = pipe.inPipe(bufferIn.get(), frameSize);
AuUInt read = frameSize;
if (pipe.inPipe->Read(AuMemory::MemoryViewStreamWrite(bufferIn.get(), read)) != AuIO::EStreamError::eErrorNone || !read)
{
break;
}
if (read != frameSize)
{
ret = read == 0 && outputStat;
@ -656,7 +717,9 @@ namespace Aurora::Compression
if (frameS2Ptr)
{
pipe.writePipe(bufferOut.get(), frameS2Ptr);
AuUInt copy = frameS2Ptr;
pipe.writePipe->Write(AuMemory::MemoryViewStreamRead(bufferOut.get(), copy));
SysAssert(copy == frameS2Ptr);
}
outputStat += frameS2Ptr;