AuroraRuntime/Source/Compression/StreamCompression.cpp
Reece Wilson ad4c18abe7 [+] Missing LZ4 compressor
[*] Various compression related bugs under the 1/4th of the AuCompression platform related to compression stream objects. Now all 1/4ths match up.
2022-07-20 21:09:40 +01:00

896 lines
25 KiB
C++

/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: StreamCompression.cpp
Date: 2021-6-17
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Compression.hpp"
#include "StreamCompression.hpp"
#if defined(_AUHAS_BZIP2)
#include "bzlib.h"
#endif
#include "zstd.h"
#include "zlib.h"
#if defined(_AUHAS_LZ4)
#include "lz4.h"
#include "lz4frame.h"
#endif
namespace Aurora::Compression
{
static bool DecompressZSTD(const CompressionPipe &info)
{
Memory::ByteBuffer buffer;
Memory::ByteBuffer inflatedBuffer;
if (!info.writePipe)
{
return false;
}
if (!info.inPipe)
{
return false;
}
auto length = ZSTD_DStreamInSize();
auto ok = AuTryResize(buffer, length);
if (!ok)
{
SysPushErrorMem("Couldn't reserve inflation buffers");
return false;
}
auto outFrameLength = ZSTD_DStreamOutSize();
ok = AuTryResize(inflatedBuffer, outFrameLength);
if (!ok)
{
SysPushErrorMem("Couldn't reserve inflation buffers");
return false;
}
auto dctx = ZSTD_createDCtx();
if (!dctx)
{
SysPushErrorGeneric("Couldn't create decompressor");
return false;
}
AuUInt outputStat{}, inputStat{};
AuUInt read = buffer.size();
while ((info.inPipe->Read(AuMemory::MemoryViewStreamWrite(buffer.data(), read)) == AuIO::EStreamError::eErrorNone) && read)
{
inputStat += read;
ZSTD_inBuffer input = { buffer.data(), read, 0 };
read = buffer.size();
while (input.pos < input.size)
{
ZSTD_outBuffer output = { inflatedBuffer.data(), outFrameLength, 0 };
auto ret = ZSTD_decompressStream(dctx, &output, &input);
if (ZSTD_isError(ret))
{
SysPushErrorIO("Compression error: {}", ret);
ZSTD_freeDCtx(dctx);
return false;
}
{
AuUInt copy = output.pos;
auto err = AuIO::WriteAll(info.writePipe.get(), AuMemory::MemoryViewStreamRead(output.dst, copy));
outputStat += output.pos;
if (copy != output.pos)
{
SysPushErrorIO();
return false;
}
}
if (info.reportProgress)
{
if (!info.reportProgress(inputStat, outputStat))
{
ZSTD_freeDCtx(dctx);
return false;
}
}
}
}
ZSTD_freeDCtx(dctx);
return true;
}
static bool CompressZSTD(const CompressionPipe &stream, const CompressionInfo &info)
{
Memory::ByteBuffer inflatedBuffer;
Memory::ByteBuffer deflatedBuffer;
size_t ret;
const auto buffInSize = ZSTD_CStreamInSize();
const auto buffOutSize = ZSTD_CStreamOutSize();
if (!stream.writePipe)
{
return false;
}
if (!stream.inPipe)
{
return false;
}
auto ok = AuTryResize(inflatedBuffer, buffInSize);
if (!ok)
{
SysPushErrorMem("Couldn't reserve deflation buffers");
return false;
}
ok = AuTryResize(deflatedBuffer, buffOutSize);
if (!ok)
{
SysPushErrorMem("Couldn't reserve deflation buffers. Out of memory");
return false;
}
auto cctx = ZSTD_createCCtx();
if (!cctx)
{
SysPushErrorGeneric("Couldn't create decompressor");
return false;
}
ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, info.compressionLevel);
if (ZSTD_isError(ret))
{
SysPushErrorGen("Invalid compression level");
ZSTD_freeCCtx(cctx);
return false;
}
ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1);
if (ZSTD_isError(ret))
{
SysPushErrorGen("Invalid option");
ZSTD_freeCCtx(cctx);
return false;
}
ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_nbWorkers, AuMax(stream.threads, 1u));
if (ZSTD_isError(ret))
{
SysPushErrorGen();
ZSTD_freeCCtx(cctx);
return false;
}
bool status = true;
size_t const toRead = buffInSize;
AuUInt outputStat {}, inputStat {};
while (true)
{
AuUInt read = buffInSize;
if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(inflatedBuffer.data(), read)) != AuIO::EStreamError::eErrorNone)
{
break;
}
inputStat += read;
/* Select the flush mode.
* If the read may not be finished (read == toRead) we use
* ZSTD_e_continue. If this is the last chunk, we use ZSTD_e_end.
* Zstd optimizes the case where the first flush mode is ZSTD_e_end,
* since it knows it is compressing the entire source in one pass.
*/
int const lastChunk = (read < toRead);
ZSTD_EndDirective const mode = lastChunk ? ZSTD_e_end : ZSTD_e_continue;
/* Set the input buffer to what we just read.
* We compress until the input buffer is empty, each time flushing the
* output.
*/
ZSTD_inBuffer input = { inflatedBuffer.data(), read, 0 };
int finished;
do
{
/* Compress into the output buffer and write all of the output to
* the file so we can reuse the buffer next iteration.
*/
ZSTD_outBuffer output = { deflatedBuffer.data(), buffOutSize, 0 };
size_t const remaining = ZSTD_compressStream2(cctx, &output, &input, mode);
if (ZSTD_isError(remaining))
{
SysPushErrorGen(": {}", ZSTD_getErrorName(remaining));
ZSTD_freeCCtx(cctx);
return false;
}
{
AuUInt copy = output.pos;
auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(output.dst, copy));
outputStat += output.pos;
if (copy != output.pos)
{
SysPushErrorIO();
return false;
}
}
if (stream.reportProgress)
{
if (!stream.reportProgress(inputStat, outputStat))
{
ZSTD_freeCCtx(cctx);
return false;
}
}
/* If we're on the last chunk we're finished when zstd returns 0,
* which means its consumed all the input AND finished the frame.
* Otherwise, we're finished when we've consumed all the input.
*/
finished = lastChunk ? (remaining == 0) : (input.pos == input.size);
} while (!finished);
SysAssertExp(input.pos == input.size);
if (lastChunk) break;
}
ZSTD_freeCCtx(cctx);
return true;
}
static bool CompressZLib(const CompressionPipe &stream, AuUInt8 level, AuInt8 bits)
{
int ret, flush;
z_stream strm {};
unsigned char in[kChunkSize];
unsigned char out[kChunkSize];
if (!stream.writePipe)
{
return false;
}
if (!stream.inPipe)
{
return false;
}
ret = deflateInit2(&strm, level, Z_DEFLATED, bits, 8, Z_DEFAULT_STRATEGY);
if (ret < Z_OK)
{
SysPushErrorIO("Error: {}", ret);
return false;
}
AuUInt32 inputStat = 0, outputStat = 0;
do
{
AuUInt read = AuArraySize(in);
AuIO::EStreamError error;
if (((error = stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read))) != AuIO::EStreamError::eErrorNone) &&
(error != AuIO::EStreamError::eErrorEndOfStream))
{
break;
}
strm.avail_in = read;
inputStat += read;
flush = (!strm.avail_in || error == AuIO::EStreamError::eErrorEndOfStream) ? Z_FINISH : Z_NO_FLUSH;
strm.next_in = in;
do
{
strm.avail_out = AuArraySize(out);
strm.next_out = out;
ret = deflate(&strm, flush);
if (ret < Z_OK)
{
SysPushErrorIO("Error: {}", ret);
return false;
}
auto have = AuArraySize(out) - strm.avail_out;
{
AuUInt copy = have;
auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(out, copy));
outputStat += copy;
if (copy != have)
{
SysPushErrorIO();
return false;
}
}
if (stream.reportProgress)
{
if (!stream.reportProgress(inputStat, outputStat))
{
deflateEnd(&strm);
return false;
}
}
} while (strm.avail_out == 0);
SysAssert(strm.avail_in == 0);
} while (flush != Z_FINISH);
deflateEnd(&strm);
return true;
}
static bool DecompressZLib(const CompressionPipe &stream, int bits)
{
int ret;
z_stream strm {};
unsigned char in[kChunkSize];
unsigned char out[kChunkSize];
if (!stream.writePipe)
{
return false;
}
if (!stream.inPipe)
{
return false;
}
ret = inflateInit2(&strm, bits);
if (ret < Z_OK)
{
SysPushErrorIO("Error: {}", ret);
return false;
}
AuUInt32 inputStat = 0, outputStat = 0;
do
{
AuUInt read = AuArraySize(in);
if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone)
{
break;
}
inputStat += read;
if (!read)
{
break;
}
strm.avail_in = read;
strm.next_in = in;
do
{
strm.avail_out = AuArraySize(out);
strm.next_out = out;
ret = inflate(&strm, Z_NO_FLUSH);
if (ret < Z_OK)
{
SysPushErrorIO("Error: {}", ret);
inflateEnd(&strm);
return false;
}
auto have = AuArraySize(out) - strm.avail_out;
{
AuUInt copy = have;
auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(out, copy));
outputStat += copy;
if (copy != have)
{
SysPushErrorIO();
return false;
}
}
if (stream.reportProgress)
{
if (!stream.reportProgress(inputStat, outputStat))
{
inflateEnd(&strm);
return false;
}
}
} while (strm.avail_out == 0);
SysAssert(strm.avail_in == 0);
} while (ret != Z_STREAM_END);
inflateEnd(&strm);
return true;
}
static bool CompressBZip2(const CompressionPipe &stream, const CompressionInfo &info)
{
#if defined(_AUHAS_BZIP2)
int ret, flush;
bz_stream strm {};
char in[kChunkSize];
char out[kChunkSize];
if (!stream.writePipe)
{
return false;
}
if (!stream.inPipe)
{
return false;
}
ret = BZ2_bzCompressInit(&strm, info.compressionLevel, 0, 0);
if (ret < BZ_OK)
{
SysPushErrorIO("Error: {}", ret);
return false;
}
AuUInt32 inputStat = 0, outputStat = 0;
do
{
AuIO::EStreamError error;
AuUInt read{ kChunkSize };
if (((error = stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read))) != AuIO::EStreamError::eErrorNone) &&
(error != AuIO::EStreamError::eErrorEndOfStream))
{
break;
}
strm.avail_in = read;
inputStat += read;
flush = (!strm.avail_in || error == AuIO::EStreamError::eErrorEndOfStream) ? BZ_FINISH : BZ_RUN;
strm.next_in = in;
do
{
strm.avail_out = AuArraySize(out);
strm.next_out = out;
ret = BZ2_bzCompress(&strm, flush);
if (ret < BZ_OK)
{
SysPushErrorIO("Error: {}", ret);
BZ2_bzCompressEnd(&strm);
return false;
}
auto have = AuArraySize(out) - strm.avail_out;
{
AuUInt copy = have;
auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(out, copy));
outputStat += copy;
if (copy != have)
{
SysPushErrorIO();
return false;
}
}
if (stream.reportProgress)
{
if (!stream.reportProgress(inputStat, outputStat))
{
BZ2_bzCompressEnd(&strm);
return false;
}
}
} while (strm.avail_out == 0);
SysAssert(strm.avail_in == 0);
} while (flush != BZ_FINISH);
BZ2_bzCompressEnd(&strm);
return true;
#else
return false;
#endif
}
static bool DecompressBZip2(const CompressionPipe &stream)
{
#if defined(_AUHAS_BZIP2)
int ret;
bz_stream strm {};
char in[kChunkSize];
char out[kChunkSize];
if (!stream.writePipe)
{
return false;
}
if (!stream.inPipe)
{
return false;
}
ret = BZ2_bzDecompressInit(&strm, 0, 0);
if (ret < Z_OK)
{
SysPushErrorIO("Error: {}", ret);
return false;
}
AuUInt32 inputStat = 0, outputStat = 0;
do
{
AuUInt read = AuArraySize(in);
if (stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone)
{
break;
}
inputStat += read;
if (!read)
{
break;
}
strm.avail_in = read;
strm.next_in = in;
do
{
strm.avail_out = AuArraySize(out);
strm.next_out = out;
ret = BZ2_bzDecompress(&strm);
if (ret < Z_OK)
{
SysPushErrorIO("Error: {}", ret);
BZ2_bzDecompressEnd(&strm);
return false;
}
auto have = AuArraySize(out) - strm.avail_out;
{
AuUInt copy = have;
auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(out, copy));
outputStat += copy;
if (copy != have)
{
SysPushErrorIO();
return false;
}
}
if (stream.reportProgress)
{
if (!stream.reportProgress(inputStat, outputStat))
{
BZ2_bzDecompressEnd(&strm);
return false;
}
}
} while (strm.avail_out == 0);
SysAssert(strm.avail_in == 0);
} while (ret != BZ_FINISH);
BZ2_bzDecompressEnd(&strm);
return true;
#else
return false;
#endif
}
static bool CompressLZ4(const CompressionPipe &stream, const CompressionInfo &info)
{
#if defined(_AUHAS_LZ4)
bool ret;
LZ4F_cctx *cctxPtr;
LZ4F_preferences_t pref {};
AuUInt32 inputStat = 0, outputStat = 0;
LZ4F_compressOptions_t options {};
char header[512];
ret = true;
pref.compressionLevel = info.compressionLevel;
pref.autoFlush = true;
auto maxFrameSize = info.lz4BlockSize ? info.lz4BlockSize : 64 * 1024;
auto buffer = AuSPtr<char>(new char[maxFrameSize], AuDefaultDeleter<char[]>());
auto maxOut = LZ4F_compressBound(maxFrameSize, &pref);
auto outBuffer = AuSPtr<char>(new char[maxOut], AuDefaultDeleter<char[]>());;
auto err = LZ4F_createCompressionContext(&cctxPtr, LZ4F_getVersion());
if (LZ4F_isError(err))
{
return false;
}
// Write header
{
auto written = LZ4F_compressBegin(cctxPtr, header, AuArraySize(header), &pref);
AuUInt copy = written;
stream.writePipe->Write(AuMemory::MemoryViewStreamRead(header, copy));
SysAssert(copy == written);
}
while (true)
{
AuUInt read = maxFrameSize;
AuIO::EStreamError error;
if (((error = stream.inPipe->Read(AuMemory::MemoryViewStreamWrite(buffer.get(), read))) != AuIO::EStreamError::eErrorNone) &&
(error != AuIO::EStreamError::eErrorEndOfStream))
{
break;
}
bool flush = (!read || error == AuIO::EStreamError::eErrorEndOfStream);
if (!flush)
{
AuUInt32 bufferedBytes = LZ4F_compressUpdate(cctxPtr, outBuffer.get(), maxOut, buffer.get(), read, &options);
if (LZ4F_isError(bufferedBytes))
{
ret = false;
break;
}
AuUInt copy = bufferedBytes;
auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(outBuffer.get(), copy));
outputStat += copy;
if (copy != bufferedBytes)
{
SysPushErrorIO();
return false;
}
}
else
{
AuUInt32 bufferedBytes = LZ4F_compressEnd(cctxPtr, outBuffer.get(), maxOut, &options);
AuUInt copy = bufferedBytes;
auto err = AuIO::WriteAll(stream.writePipe.get(), AuMemory::MemoryViewStreamRead(outBuffer.get(), copy));
outputStat += copy;
if (copy != bufferedBytes)
{
SysPushErrorIO();
return false;
}
break;
}
if (stream.reportProgress)
{
if (!stream.reportProgress(inputStat, outputStat))
{
ret = false;
break;
}
}
}
LZ4F_freeCompressionContext(cctxPtr);
return ret;
#else
return false;
#endif
}
static bool DecompressLZ4(const CompressionPipe &pipe, AuUInt32 frameMaxSize)
{
#if defined(_AUHAS_LZ4)
bool ret = true;
LZ4F_dctx *dctxPtr;
AuUInt32 inputStat = 0, outputStat = 0;
size_t bytesRemInFrame {};
LZ4F_decompressOptions_t opts {};
AuSPtr<char> bufferIn, bufferOut;
if (!pipe.writePipe)
{
return false;
}
if (!pipe.inPipe)
{
return false;
}
auto err = LZ4F_createDecompressionContext(&dctxPtr, LZ4F_getVersion());
if (LZ4F_isError(err))
{
return false;
}
bufferIn = AuMakeSharedArray<char>(frameMaxSize);
bufferOut = AuMakeSharedArray<char>(frameMaxSize);
while (true)
{
auto frameSize = bytesRemInFrame ? bytesRemInFrame : LZ4F_MIN_SIZE_TO_KNOW_HEADER_LENGTH;
auto min = frameSize;
if (min > frameMaxSize)
{
ret = false;
break;
}
if (frameSize)
{
AuUInt read = frameSize;
if (pipe.inPipe->Read(AuMemory::MemoryViewStreamWrite(bufferIn.get(), read)) != AuIO::EStreamError::eErrorNone)
{
break;
}
if (read != frameSize)
{
ret = read == 0 && outputStat;
break;
}
inputStat += frameSize;
}
if (frameSize)
{
auto mustConsume = frameSize;
size_t frameSPtr = mustConsume;
size_t frameS2Ptr = frameMaxSize;
bytesRemInFrame = LZ4F_decompress(dctxPtr, bufferOut.get(), &frameS2Ptr, bufferIn.get(), &frameSPtr, &opts);
if (LZ4F_isError(bytesRemInFrame))
{
SysPushErrorIO("{}", LZ4F_getErrorName(bytesRemInFrame));
ret = false;
break;
}
if (frameS2Ptr)
{
AuUInt copy = frameS2Ptr;
auto err = AuIO::WriteAll(pipe.writePipe.get(), AuMemory::MemoryViewStreamRead(bufferOut.get(), copy));
outputStat += copy;
if (copy != frameS2Ptr)
{
SysPushErrorIO();
return false;
}
}
if (pipe.reportProgress)
{
if (!pipe.reportProgress(inputStat, outputStat))
{
ret = false;
break;
}
}
}
}
LZ4F_freeDecompressionContext(dctxPtr);
return ret;
#else
return false;
#endif
}
AUKN_SYM bool Compress(const CompressionPipe &stream, const CompressionInfo &info)
{
CompressionInfo meta = info;
AuInt8 bits;
if (!stream.inPipe)
{
return {};
}
if (!stream.writePipe)
{
return {};
}
switch (info.type)
{
case ECompressionType::eDeflate:
case ECompressionType::eZip:
case ECompressionType::eGZip:
{
if (!CompressionLevelFromExternalApi(meta, bits))
{
return {};
}
return CompressZLib(stream, info.compressionLevel, bits);
}
case ECompressionType::eZSTD:
return CompressZSTD(stream, info);
case ECompressionType::eBZIP2:
return CompressBZip2(stream, info);
case ECompressionType::eLZ4:
return CompressLZ4(stream, info);
//case ECompressionType::eLZMA:
// return CompressLZMA(stream, info);
default:
return false;
}
}
AUKN_SYM bool Decompress(const CompressionPipe &pipe, const DecompressInfo &meta2)
{
DecompressInfo meta = meta2;
AuInt8 bits;
if (!pipe.inPipe)
{
return {};
}
if (!pipe.writePipe)
{
return {};
}
switch (meta.alg)
{
case ECompressionType::eDeflate:
case ECompressionType::eZip:
case ECompressionType::eGZip:
{
if (!CompressionLevelFromExternalApi(meta, bits))
{
return {};
}
return DecompressZLib(pipe, bits);
}
case ECompressionType::eZSTD:
return DecompressZSTD(pipe);
case ECompressionType::eBZIP2:
return DecompressBZip2(pipe);
case ECompressionType::eLZ4:
return DecompressLZ4(pipe, meta.internalStreamSize / 2);
//case ECompressionType::eLZMA:
// return DecompressLZMA(pipe);
default:
return false;
}
}
}