/*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: BlockDecompressor.cpp Date: 2021-6-17 Author: Reece ***/ #include #include "Compression.hpp" #include "BlockDecompressor.hpp" #include "bzlib.h" #include "zstd.h" #include "zlib.h" #include "lz4.h" namespace Aurora::Compression { bool BaseStream::ReadByProcessedN(void * buffer, AuUInt32 minimumInflated, AuStreamReadWrittenPair_t &pair, bool ingestUntilEOS) { AuUInt32 read {}, len {}; if (ingestUntilEOS) { while (this->_outbuffer.RemainingBytes() < minimumInflated) { if (Ingest(4096).second == 0) { if (!this->_outbuffer.RemainingBytes()) { return false; } break; } read += 4096; } } len = this->_outbuffer.Read(buffer, len, buffer == nullptr); pair = {read, len}; return len != 0; } bool BaseStream::GoBackByProcessedN(AuUInt32 offset) { return this->_outbuffer.ReaderTryGoBack(offset); } bool BaseStream::GoForwardByProcessedN(AuUInt32 offset) { return this->_outbuffer.ReaderTryGoForward(offset); } bool BaseStream::Write(const void *a, AuUInt32 length) { auto written = this->_outbuffer.Write(reinterpret_cast(a), length); if (written != length) { auto increase = std::max(0, (int)length - (int)this->_outbuffer.RemainingWrite()); increase += this->_outbuffer.length; if (increase > 64 * 1024 * 1024) { return false; } if (!this->_outbuffer.Resize(increase)) { return false; } auto remaining = length - written; written = this->_outbuffer.Write(reinterpret_cast(a) + written, remaining); if (written != remaining) { return false; } } return true; } AuUInt32 BaseStream::GetInternalBufferSize() { return this->_outbuffer.allocSize; } class ZSTDInflate : public BaseStream { public: ZSTDInflate() : BaseStream() { } ~ZSTDInflate() { if (auto dctx = std::exchange(dctx_, {})) { ZSTD_freeDCtx(dctx); } } bool Init(Aurora::IO::IStreamReader *reader) { this->reader_ = reader; this->dctx_ = ZSTD_createDCtx(); if (!this->dctx_) { SysPushErrorGen("Couldn't create decompressor"); return false; } return true; } AuStreamReadWrittenPair_t Ingest(AuUInt32 input) override { AuUInt32 length = ZSTD_DStreamInSize(); AuUInt32 outFrameLength = ZSTD_DStreamOutSize(); AuUInt32 done{}, read{}; while (read != input) { AuUInt32 request = std::min(input, length); if (this->reader_->Read(din_, request) != IO::EStreamError::eErrorNone) { return AuMakePair(read, done); } read += request; input_ = ZSTD_inBuffer{ din_, request, 0 }; while (input_.pos < input_.size) { ZSTD_outBuffer output = { dout_, outFrameLength, 0 }; auto ret = ZSTD_decompressStream(this->dctx_, &output, &input_); if (ZSTD_isError(ret)) { SysPushErrorIO("Compression error: {}", ret); return AuMakePair(read, 0); } done += output.pos; if (!Write(reinterpret_cast(output.dst), output.pos)) { return {}; } } } return AuMakePair(read, done); } private: Aurora::IO::IStreamReader *reader_; ZSTD_DCtx *dctx_; char din_[ZSTD_BLOCKSIZE_MAX + 3 /*ZSTD_BLOCKHEADERSIZE*/]; char dout_[ZSTD_BLOCKSIZE_MAX]; ZSTD_inBuffer input_; }; class ZIPInflate : public BaseStream { public: ~ZIPInflate() { if (auto ctx = std::exchange(this->init_, {})) { inflateEnd(&this->ctx_); } } bool Init(Aurora::IO::IStreamReader *reader) { this->reader_ = reader; auto ret = inflateInit(&this->ctx_); if (ret != Z_OK) { SysPushErrorMem("Error: {}", ret); return false; } this->init_ = true; return true; } AuStreamReadWrittenPair_t Ingest(AuUInt32 input) override { int ret; AuUInt32 done{}, read{}; while (read < input) { AuUInt32 request = std::min(input, AuUInt32(AuArraySize(din_))); if (this->reader_->Read(din_, request) != IO::EStreamError::eErrorNone) { return AuMakePair(read, done); } read += request; this->ctx_.avail_in = request; this->ctx_.next_in = reinterpret_cast(din_); do { this->ctx_.avail_out = AuArraySize(dout_); // std::min(AuUInt32(AuArraySize(dout_)), AuUInt32(this->_outbuffer.RemainingWrite())); this->ctx_.next_out = dout_; if (!this->ctx_.avail_out) { break; } ret = inflate(&this->ctx_, Z_NO_FLUSH); if (ret != Z_OK) { SysPushErrorIO("Error: {}", ret); return AuMakePair(read, 0); } auto have = AuArraySize(dout_) - this->ctx_.avail_out; done += have; if (!Write(reinterpret_cast(dout_), have)) { return {}; } } while (this->ctx_.avail_out == 0); } return AuMakePair(read, done); } private: Aurora::IO::IStreamReader *reader_; z_stream ctx_ {}; bool init_ {}; unsigned char din_[4096]; unsigned char dout_[4096]; }; class BZIPInflate : public BaseStream { public: ~BZIPInflate() { if (auto ctx = std::exchange(this->init_, {})) { BZ2_bzDecompressEnd(&this->ctx_); } } bool Init(Aurora::IO::IStreamReader *reader) { this->reader_ = reader; auto ret = BZ2_bzDecompressInit(&this->ctx_, 0, 0); if (ret != Z_OK) { SysPushErrorMem("Error: {}", ret); return false; } this->init_ = true; return true; } AuStreamReadWrittenPair_t Ingest(AuUInt32 input) override { int ret; AuUInt32 done{}, read{}; while (read < input || userBound_) { AuUInt32 request = std::min(input, AuUInt32(AuArraySize(din_))); if (this->reader_->Read(din_, request) != IO::EStreamError::eErrorNone) { return AuMakePair(read, done); } read += request; this->ctx_.avail_in = request; this->ctx_.next_in = reinterpret_cast(din_); do { this->ctx_.avail_out = AuArraySize(dout_); this->ctx_.next_out = dout_; ret = BZ2_bzDecompress(&this->ctx_); if (ret != Z_OK) { SysPushErrorIO("Error: {}", ret); return AuMakePair(read, 0); } auto have = AuArraySize(dout_) - this->ctx_.avail_out; done += have; if (!Write(reinterpret_cast(dout_), have)) { return {}; } } while (this->ctx_.avail_out == 0); } return AuMakePair(read, done); } private: Aurora::IO::IStreamReader *reader_; bz_stream ctx_ {}; bool init_ {}; char dout_[4096]; bool userBound_ {}; char din_[4096]; }; class LZ4Inflate : public BaseStream { public: LZ4Inflate() : BaseStream(64 * 1024 * 2) {} ~LZ4Inflate() { } bool Init(Aurora::IO::IStreamReader *reader) { this->reader_ = reader; this->lz4Stream_ = LZ4_createStreamDecode(); if (!this->lz4Stream_) { SysPushErrorMem(); return false; } return true; } AuStreamReadWrittenPair_t Ingest(AuUInt32 input) override { return {}; } private: Aurora::IO::IStreamReader *reader_; LZ4_streamDecode_t* lz4Stream_ {}; }; AUKN_SYM ICompressionStream *DecompressorNew(IO::IStreamReader *reader, ECompresionType type) { BaseStream * ret{}; switch (type) { case ECompresionType::eZSTD: ret = new ZSTDInflate(); break; case ECompresionType::eBZIP2: ret = new BZIPInflate(); break; case ECompresionType::eLZ4: ret = new LZ4Inflate(); break; case ECompresionType::eDeflate: ret = new ZIPInflate(); break; default: ret = nullptr; break; } if (ret) { if (!ret->Init(reader)) { delete ret; ret = nullptr; } } return ret; } AUKN_SYM void DecompressorRelease(ICompressionStream * stream) { SafeDelete(stream); } }