/*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: StreamCompression.cpp Date: 2021-6-17 Author: Reece ***/ #include #include "Compression.hpp" #include "StreamCompression.hpp" #if defined(_AUHAS_BZIP2) #include "bzlib.h" #endif #include "zstd.h" #include "zlib.h" #if defined(_AUHAS_LZ4) #include "lz4.h" #include "lz4frame.h" #endif namespace Aurora::Compression { static bool DecompressZSTD(const CompressionPipe &info) { Memory::ByteBuffer buffer; Memory::ByteBuffer inflatedBuffer; if (!info.pWritePipe) { return false; } if (!info.pReadPipe) { return false; } auto length = ZSTD_DStreamInSize(); auto ok = AuTryResize(buffer, length); if (!ok) { SysPushErrorMem("Couldn't reserve inflation buffers"); return false; } auto outFrameLength = ZSTD_DStreamOutSize(); ok = AuTryResize(inflatedBuffer, outFrameLength); if (!ok) { SysPushErrorMem("Couldn't reserve inflation buffers"); return false; } auto dctx = ZSTD_createDCtx(); if (!dctx) { SysPushErrorGeneric("Couldn't create decompressor"); return false; } AuUInt outputStat{}, inputStat{}; AuUInt read = buffer.size(); while ((info.pReadPipe->Read(AuMemory::MemoryViewStreamWrite(buffer.data(), read)) == AuIO::EStreamError::eErrorNone) && read) { inputStat += read; ZSTD_inBuffer input = { buffer.data(), read, 0 }; read = buffer.size(); while (input.pos < input.size) { ZSTD_outBuffer output = { inflatedBuffer.data(), outFrameLength, 0 }; auto ret = ZSTD_decompressStream(dctx, &output, &input); if (ZSTD_isError(ret)) { SysPushErrorIO("Compression error: {}", ret); ZSTD_freeDCtx(dctx); return false; } { AuUInt copy = output.pos; auto err = AuIO::WriteAll(info.pWritePipe.get(), AuMemory::MemoryViewStreamRead(output.dst, copy)); outputStat += output.pos; if (copy != output.pos) { SysPushErrorIO(); return false; } } if (info.reportProgress) { if (!info.reportProgress(inputStat, outputStat)) { ZSTD_freeDCtx(dctx); return false; } } } } ZSTD_freeDCtx(dctx); return true; } static bool CompressZSTD(const CompressionPipe &stream, const CompressionInfo &info) { Memory::ByteBuffer inflatedBuffer; Memory::ByteBuffer deflatedBuffer; size_t ret; const auto buffInSize = ZSTD_CStreamInSize(); const auto buffOutSize = ZSTD_CStreamOutSize(); if (!stream.pWritePipe) { return false; } if (!stream.pReadPipe) { return false; } auto ok = AuTryResize(inflatedBuffer, buffInSize); if (!ok) { SysPushErrorMem("Couldn't reserve deflation buffers"); return false; } ok = AuTryResize(deflatedBuffer, buffOutSize); if (!ok) { SysPushErrorMem("Couldn't reserve deflation buffers. Out of memory"); return false; } auto cctx = ZSTD_createCCtx(); if (!cctx) { SysPushErrorGeneric("Couldn't create decompressor"); return false; } ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, info.compressionLevel); if (ZSTD_isError(ret)) { SysPushErrorGen("Invalid compression level"); ZSTD_freeCCtx(cctx); return false; } ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1); if (ZSTD_isError(ret)) { SysPushErrorGen("Invalid option"); ZSTD_freeCCtx(cctx); return false; } ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_nbWorkers, AuMax(stream.threads, 1u)); if (ZSTD_isError(ret)) { SysPushErrorGen(); ZSTD_freeCCtx(cctx); return false; } bool status = true; size_t const toRead = buffInSize; AuUInt outputStat {}, inputStat {}; while (true) { AuUInt read = buffInSize; if (stream.pReadPipe->Read(AuMemory::MemoryViewStreamWrite(inflatedBuffer.data(), read)) != AuIO::EStreamError::eErrorNone) { break; } inputStat += read; /* Select the flush mode. * If the read may not be finished (read == toRead) we use * ZSTD_e_continue. If this is the last chunk, we use ZSTD_e_end. * Zstd optimizes the case where the first flush mode is ZSTD_e_end, * since it knows it is compressing the entire source in one pass. */ int const lastChunk = (read < toRead); ZSTD_EndDirective const mode = lastChunk ? ZSTD_e_end : ZSTD_e_continue; /* Set the input buffer to what we just read. * We compress until the input buffer is empty, each time flushing the * output. */ ZSTD_inBuffer input = { inflatedBuffer.data(), read, 0 }; int finished; do { /* Compress into the output buffer and write all of the output to * the file so we can reuse the buffer next iteration. */ ZSTD_outBuffer output = { deflatedBuffer.data(), buffOutSize, 0 }; size_t const remaining = ZSTD_compressStream2(cctx, &output, &input, mode); if (ZSTD_isError(remaining)) { SysPushErrorGen(": {}", ZSTD_getErrorName(remaining)); ZSTD_freeCCtx(cctx); return false; } { AuUInt copy = output.pos; auto err = AuIO::WriteAll(stream.pWritePipe.get(), AuMemory::MemoryViewStreamRead(output.dst, copy)); outputStat += output.pos; if (copy != output.pos) { SysPushErrorIO(); return false; } } if (stream.reportProgress) { if (!stream.reportProgress(inputStat, outputStat)) { ZSTD_freeCCtx(cctx); return false; } } /* If we're on the last chunk we're finished when zstd returns 0, * which means its consumed all the input AND finished the frame. * Otherwise, we're finished when we've consumed all the input. */ finished = lastChunk ? (remaining == 0) : (input.pos == input.size); } while (!finished); SysAssertExp(input.pos == input.size); if (lastChunk) break; } ZSTD_freeCCtx(cctx); return true; } static bool CompressZLib(const CompressionPipe &stream, AuUInt8 level, AuInt8 bits) { int ret, flush; z_stream strm {}; unsigned char in[kChunkSize]; unsigned char out[kChunkSize]; if (!stream.pWritePipe) { return false; } if (!stream.pReadPipe) { return false; } ret = deflateInit2(&strm, level, Z_DEFLATED, bits, 8, Z_DEFAULT_STRATEGY); if (ret < Z_OK) { SysPushErrorIO("Error: {}", ret); return false; } AuUInt32 inputStat = 0, outputStat = 0; do { AuUInt read = AuArraySize(in); AuIO::EStreamError error; if (((error = stream.pReadPipe->Read(AuMemory::MemoryViewStreamWrite(in, read))) != AuIO::EStreamError::eErrorNone) && (error != AuIO::EStreamError::eErrorEndOfStream)) { break; } strm.avail_in = read; inputStat += read; flush = (!strm.avail_in || error == AuIO::EStreamError::eErrorEndOfStream) ? Z_FINISH : Z_NO_FLUSH; strm.next_in = in; do { strm.avail_out = AuArraySize(out); strm.next_out = out; ret = deflate(&strm, flush); if (ret < Z_OK) { SysPushErrorIO("Error: {}", ret); return false; } auto have = AuArraySize(out) - strm.avail_out; { AuUInt copy = have; auto err = AuIO::WriteAll(stream.pWritePipe.get(), AuMemory::MemoryViewStreamRead(out, copy)); outputStat += copy; if (copy != have) { SysPushErrorIO(); return false; } } if (stream.reportProgress) { if (!stream.reportProgress(inputStat, outputStat)) { deflateEnd(&strm); return false; } } } while (strm.avail_out == 0); SysAssert(strm.avail_in == 0); } while (flush != Z_FINISH); deflateEnd(&strm); return true; } static bool DecompressZLib(const CompressionPipe &stream, int bits) { int ret; z_stream strm {}; unsigned char in[kChunkSize]; unsigned char out[kChunkSize]; if (!stream.pWritePipe) { return false; } if (!stream.pReadPipe) { return false; } ret = inflateInit2(&strm, bits); if (ret < Z_OK) { SysPushErrorIO("Error: {}", ret); return false; } AuUInt32 inputStat = 0, outputStat = 0; do { AuUInt read = AuArraySize(in); if (stream.pReadPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone) { break; } inputStat += read; if (!read) { break; } strm.avail_in = read; strm.next_in = in; do { strm.avail_out = AuArraySize(out); strm.next_out = out; ret = inflate(&strm, Z_NO_FLUSH); if (ret < Z_OK) { SysPushErrorIO("Error: {}", ret); inflateEnd(&strm); return false; } auto have = AuArraySize(out) - strm.avail_out; { AuUInt copy = have; auto err = AuIO::WriteAll(stream.pWritePipe.get(), AuMemory::MemoryViewStreamRead(out, copy)); outputStat += copy; if (copy != have) { SysPushErrorIO(); return false; } } if (stream.reportProgress) { if (!stream.reportProgress(inputStat, outputStat)) { inflateEnd(&strm); return false; } } } while (strm.avail_out == 0); SysAssert(strm.avail_in == 0); } while (ret != Z_STREAM_END); inflateEnd(&strm); return true; } static bool CompressBZip2(const CompressionPipe &stream, const CompressionInfo &info) { #if defined(_AUHAS_BZIP2) int ret, flush; bz_stream strm {}; char in[kChunkSize]; char out[kChunkSize]; if (!stream.pWritePipe) { return false; } if (!stream.pReadPipe) { return false; } ret = BZ2_bzCompressInit(&strm, info.compressionLevel, 0, 0); if (ret < BZ_OK) { SysPushErrorIO("Error: {}", ret); return false; } AuUInt32 inputStat = 0, outputStat = 0; do { AuIO::EStreamError error; AuUInt read{ kChunkSize }; if (((error = stream.pReadPipe->Read(AuMemory::MemoryViewStreamWrite(in, read))) != AuIO::EStreamError::eErrorNone) && (error != AuIO::EStreamError::eErrorEndOfStream)) { break; } strm.avail_in = read; inputStat += read; flush = (!strm.avail_in || error == AuIO::EStreamError::eErrorEndOfStream) ? BZ_FINISH : BZ_RUN; strm.next_in = in; do { strm.avail_out = AuArraySize(out); strm.next_out = out; ret = BZ2_bzCompress(&strm, flush); if (ret < BZ_OK) { SysPushErrorIO("Error: {}", ret); BZ2_bzCompressEnd(&strm); return false; } auto have = AuArraySize(out) - strm.avail_out; { AuUInt copy = have; auto err = AuIO::WriteAll(stream.pWritePipe.get(), AuMemory::MemoryViewStreamRead(out, copy)); outputStat += copy; if (copy != have) { SysPushErrorIO(); return false; } } if (stream.reportProgress) { if (!stream.reportProgress(inputStat, outputStat)) { BZ2_bzCompressEnd(&strm); return false; } } } while (strm.avail_out == 0); SysAssert(strm.avail_in == 0); } while (flush != BZ_FINISH); BZ2_bzCompressEnd(&strm); return true; #else return false; #endif } static bool DecompressBZip2(const CompressionPipe &stream) { #if defined(_AUHAS_BZIP2) int ret; bz_stream strm {}; char in[kChunkSize]; char out[kChunkSize]; if (!stream.pWritePipe) { return false; } if (!stream.pReadPipe) { return false; } ret = BZ2_bzDecompressInit(&strm, 0, 0); if (ret < Z_OK) { SysPushErrorIO("Error: {}", ret); return false; } AuUInt32 inputStat = 0, outputStat = 0; do { AuUInt read = AuArraySize(in); if (stream.pReadPipe->Read(AuMemory::MemoryViewStreamWrite(in, read)) != AuIO::EStreamError::eErrorNone) { break; } inputStat += read; if (!read) { break; } strm.avail_in = read; strm.next_in = in; do { strm.avail_out = AuArraySize(out); strm.next_out = out; ret = BZ2_bzDecompress(&strm); if (ret < Z_OK) { SysPushErrorIO("Error: {}", ret); BZ2_bzDecompressEnd(&strm); return false; } auto have = AuArraySize(out) - strm.avail_out; { AuUInt copy = have; auto err = AuIO::WriteAll(stream.pWritePipe.get(), AuMemory::MemoryViewStreamRead(out, copy)); outputStat += copy; if (copy != have) { SysPushErrorIO(); return false; } } if (stream.reportProgress) { if (!stream.reportProgress(inputStat, outputStat)) { BZ2_bzDecompressEnd(&strm); return false; } } } while (strm.avail_out == 0); SysAssert(strm.avail_in == 0); } while (ret != BZ_FINISH); BZ2_bzDecompressEnd(&strm); return true; #else return false; #endif } static bool CompressLZ4(const CompressionPipe &stream, const CompressionInfo &info) { #if defined(_AUHAS_LZ4) bool ret; LZ4F_cctx *cctxPtr; LZ4F_preferences_t pref {}; AuUInt32 inputStat = 0, outputStat = 0; LZ4F_compressOptions_t options {}; char header[512]; ret = true; pref.compressionLevel = info.compressionLevel; pref.autoFlush = true; auto maxFrameSize = info.lz4BlockSize ? info.lz4BlockSize : 64 * 1024; auto buffer = AuSPtr(new char[maxFrameSize], AuDefaultDeleter()); auto maxOut = LZ4F_compressBound(maxFrameSize, &pref); auto outBuffer = AuSPtr(new char[maxOut], AuDefaultDeleter());; auto err = LZ4F_createCompressionContext(&cctxPtr, LZ4F_getVersion()); if (LZ4F_isError(err)) { return false; } // Write header { auto written = LZ4F_compressBegin(cctxPtr, header, AuArraySize(header), &pref); AuUInt copy = written; stream.pWritePipe->Write(AuMemory::MemoryViewStreamRead(header, copy)); SysAssert(copy == written); } while (true) { AuUInt read = maxFrameSize; AuIO::EStreamError error; if (((error = stream.pReadPipe->Read(AuMemory::MemoryViewStreamWrite(buffer.get(), read))) != AuIO::EStreamError::eErrorNone) && (error != AuIO::EStreamError::eErrorEndOfStream)) { break; } bool flush = (!read || error == AuIO::EStreamError::eErrorEndOfStream); if (!flush) { AuUInt32 bufferedBytes = LZ4F_compressUpdate(cctxPtr, outBuffer.get(), maxOut, buffer.get(), read, &options); if (LZ4F_isError(bufferedBytes)) { ret = false; break; } AuUInt copy = bufferedBytes; auto err = AuIO::WriteAll(stream.pWritePipe.get(), AuMemory::MemoryViewStreamRead(outBuffer.get(), copy)); outputStat += copy; if (copy != bufferedBytes) { SysPushErrorIO(); return false; } } else { AuUInt32 bufferedBytes = LZ4F_compressEnd(cctxPtr, outBuffer.get(), maxOut, &options); AuUInt copy = bufferedBytes; auto err = AuIO::WriteAll(stream.pWritePipe.get(), AuMemory::MemoryViewStreamRead(outBuffer.get(), copy)); outputStat += copy; if (copy != bufferedBytes) { SysPushErrorIO(); return false; } break; } if (stream.reportProgress) { if (!stream.reportProgress(inputStat, outputStat)) { ret = false; break; } } } LZ4F_freeCompressionContext(cctxPtr); return ret; #else return false; #endif } static bool DecompressLZ4(const CompressionPipe &pipe, AuUInt32 frameMaxSize) { #if defined(_AUHAS_LZ4) bool ret = true; LZ4F_dctx *dctxPtr; AuUInt32 inputStat = 0, outputStat = 0; size_t bytesRemInFrame {}; LZ4F_decompressOptions_t opts {}; AuSPtr bufferIn, bufferOut; if (!pipe.pWritePipe) { return false; } if (!pipe.pReadPipe) { return false; } auto err = LZ4F_createDecompressionContext(&dctxPtr, LZ4F_getVersion()); if (LZ4F_isError(err)) { return false; } bufferIn = AuMakeSharedArray(frameMaxSize); bufferOut = AuMakeSharedArray(frameMaxSize); while (true) { auto frameSize = bytesRemInFrame ? bytesRemInFrame : LZ4F_MIN_SIZE_TO_KNOW_HEADER_LENGTH; auto min = frameSize; if (min > frameMaxSize) { ret = false; break; } if (frameSize) { AuUInt read = frameSize; if (pipe.pReadPipe->Read(AuMemory::MemoryViewStreamWrite(bufferIn.get(), read)) != AuIO::EStreamError::eErrorNone) { break; } if (read != frameSize) { ret = read == 0 && outputStat; break; } inputStat += frameSize; } if (frameSize) { auto mustConsume = frameSize; size_t frameSPtr = mustConsume; size_t frameS2Ptr = frameMaxSize; bytesRemInFrame = LZ4F_decompress(dctxPtr, bufferOut.get(), &frameS2Ptr, bufferIn.get(), &frameSPtr, &opts); if (LZ4F_isError(bytesRemInFrame)) { SysPushErrorIO("{}", LZ4F_getErrorName(bytesRemInFrame)); ret = false; break; } if (frameS2Ptr) { AuUInt copy = frameS2Ptr; auto err = AuIO::WriteAll(pipe.pWritePipe.get(), AuMemory::MemoryViewStreamRead(bufferOut.get(), copy)); outputStat += copy; if (copy != frameS2Ptr) { SysPushErrorIO(); return false; } } if (pipe.reportProgress) { if (!pipe.reportProgress(inputStat, outputStat)) { ret = false; break; } } } } LZ4F_freeDecompressionContext(dctxPtr); return ret; #else return false; #endif } AUKN_SYM bool Compress(const CompressionPipe &stream, const CompressionInfo &info) { CompressionInfo meta = info; AuInt8 bits; if (!stream.pReadPipe) { return {}; } if (!stream.pWritePipe) { return {}; } switch (info.type) { case ECompressionType::eDeflate: case ECompressionType::eZip: case ECompressionType::eGZip: { if (!CompressionLevelFromExternalApi(meta, bits)) { return {}; } return CompressZLib(stream, info.compressionLevel, bits); } case ECompressionType::eZSTD: return CompressZSTD(stream, info); case ECompressionType::eBZIP2: return CompressBZip2(stream, info); case ECompressionType::eLZ4: return CompressLZ4(stream, info); //case ECompressionType::eLZMA: // return CompressLZMA(stream, info); default: return false; } } AUKN_SYM bool Decompress(const CompressionPipe &pipe, const DecompressInfo &meta2) { DecompressInfo meta = meta2; AuInt8 bits; if (!pipe.pReadPipe) { return {}; } if (!pipe.pWritePipe) { return {}; } switch (meta.alg) { case ECompressionType::eDeflate: case ECompressionType::eZip: case ECompressionType::eGZip: { if (!CompressionLevelFromExternalApi(meta, bits)) { return {}; } return DecompressZLib(pipe, bits); } case ECompressionType::eZSTD: return DecompressZSTD(pipe); case ECompressionType::eBZIP2: return DecompressBZip2(pipe); case ECompressionType::eLZ4: return DecompressLZ4(pipe, meta.internalStreamSize / 2); //case ECompressionType::eLZMA: // return DecompressLZMA(pipe); default: return false; } } }