diff --git a/Include/Aurora/Compression/CompressionInfo.hpp b/Include/Aurora/Compression/CompressionInfo.hpp index ca71512d..412cf9f3 100644 --- a/Include/Aurora/Compression/CompressionInfo.hpp +++ b/Include/Aurora/Compression/CompressionInfo.hpp @@ -30,5 +30,8 @@ namespace Aurora::Compression /// (1 << 12) <= dictSize <= (3 << 29) for 64-bit version /// default = (1 << 24) AuUInt32 dictSize{}; + + // 64KiB is a recommended "small" block size + AuUInt16 lz4BlockSize {}; }; } \ No newline at end of file diff --git a/Include/Aurora/Compression/ICompressionStream.hpp b/Include/Aurora/Compression/ICompressionStream.hpp index 05b1d496..75f54de4 100644 --- a/Include/Aurora/Compression/ICompressionStream.hpp +++ b/Include/Aurora/Compression/ICompressionStream.hpp @@ -12,7 +12,7 @@ namespace Aurora::Compression class ICompressionStream { public: - virtual bool Ingest(AuUInt32 minimum, AuUInt32 request) = 0; + virtual std::pair Ingest(AuUInt32 input) = 0; virtual bool Read(void * /*opt*/, AuUInt32 &len, bool ingestUntilEOS = true) = 0; }; diff --git a/Source/Compression/BlockCompressor.cpp b/Source/Compression/BlockCompressor.cpp index ced0df4d..72c28948 100644 --- a/Source/Compression/BlockCompressor.cpp +++ b/Source/Compression/BlockCompressor.cpp @@ -22,7 +22,7 @@ namespace Aurora::Compression { while (this->_outbuffer.size() < len) { - if (!Ingest(0, len)) + if (Ingest(4096).second == 0) { return false; } diff --git a/Source/Compression/BlockDecompressor.cpp b/Source/Compression/BlockDecompressor.cpp index 66c6fd43..1e41c490 100644 --- a/Source/Compression/BlockDecompressor.cpp +++ b/Source/Compression/BlockDecompressor.cpp @@ -22,7 +22,7 @@ namespace Aurora::Compression { while (this->_outbuffer.size() < len) { - if (!Ingest(0, len)) + if (Ingest(4096).second == 0) { return false; } @@ -59,27 +59,22 @@ namespace Aurora::Compression return true; } - bool Ingest(AuUInt32 minimum, AuUInt32 requestBuffer) override + std::pair Ingest(AuUInt32 input) override { AuUInt32 length = ZSTD_DStreamInSize(); void *din = alloca(length); - - AuUInt32 readLength = requestBuffer; - AuUInt32 requested = std::min(readLength, length); - AuUInt32 request = requested; - AuUInt32 done{}; - auto outFrameLength = ZSTD_DStreamOutSize(); void *dout = alloca(outFrameLength); + AuUInt32 done{}, read{}; - while (done != readLength) + while (read != input) { + AuUInt32 request = std::min(input, length); if (this->_reader->Read(din, request) != IO::EStreamError::eErrorNone) { - return minimum <= done; + return std::make_pair(read, done); } - - done += request; + read += request; ZSTD_inBuffer input = { din, request, 0 }; while (input.pos < input.size) @@ -90,14 +85,17 @@ namespace Aurora::Compression if (ZSTD_isError(ret)) { SysPushErrorIO("Compression error: {}", ret); - return false; + return std::make_pair(read, 0); } - this->_outbuffer.insert(this->_outbuffer.end(), reinterpret_cast(output.dst), reinterpret_cast(output.dst) + output.pos); + done += output.pos; + this->_outbuffer.insert(this->_outbuffer.end(), + reinterpret_cast(output.dst), + reinterpret_cast(output.dst) + output.pos); } } - - return true; + + return std::make_pair(read, done); } private: @@ -133,49 +131,54 @@ namespace Aurora::Compression return true; } - bool Ingest(AuUInt32 minimum, AuUInt32 requestBuffer) override + std::pair Ingest(AuUInt32 input) override { int ret; - AuUInt32 readLength = requestBuffer; - unsigned char dout[4096]; - unsigned char din[4096]; - AuUInt32 request = std::min(requestBuffer, AuUInt32(ArraySize(din))); + + AuUInt32 done{}, read{}; - AuUInt32 done{}; - while (done != readLength) + while (read != input) { - if (this->_reader->Read(din, request) != IO::EStreamError::eErrorNone) + AuUInt32 request = std::min(input, AuUInt32(ArraySize(din_))); + if (this->_reader->Read(din_, request) != IO::EStreamError::eErrorNone) { - return minimum <= done; + return std::make_pair(read, done); } - done += request; + read += request; this->_ctx.avail_in = request; - this->_ctx.next_in = reinterpret_cast(din); + this->_ctx.next_in = reinterpret_cast(din_); do { - this->_ctx.avail_out = ArraySize(dout); - this->_ctx.next_out = dout; + this->_ctx.avail_out = ArraySize(dout_); + this->_ctx.next_out = dout_; + + if (!this->_ctx.avail_out) + { + break; + } ret = inflate(&this->_ctx, Z_NO_FLUSH); if (ret != Z_OK) { SysPushErrorIO("Error: {}", ret); - return false; + return std::make_pair(read, 0); } - auto have = ArraySize(dout) - this->_ctx.avail_out; - - this->_outbuffer.insert(this->_outbuffer.end(), reinterpret_cast(dout), reinterpret_cast(dout) + have); + auto have = ArraySize(dout_) - this->_ctx.avail_out; + done += have; + this->_outbuffer.insert(this->_outbuffer.end(), + reinterpret_cast(dout_), + reinterpret_cast(dout_) + have); } while (this->_ctx.avail_out == 0); SysAssert(this->_ctx.avail_in == 0); } - - return true; + + return std::make_pair(read, done); } private: @@ -183,6 +186,8 @@ namespace Aurora::Compression Aurora::IO::IStreamReader *_reader; z_stream _ctx {}; bool _init {}; + unsigned char din_[4096]; + unsigned char dout_[4096]; }; @@ -213,50 +218,48 @@ namespace Aurora::Compression return true; } - bool Ingest(AuUInt32 minimum, AuUInt32 requestBuffer) override + std::pair Ingest(AuUInt32 input) override { int ret; - char dout[4096]; - char din[4096]; - - AuUInt32 readLength = requestBuffer; - AuUInt32 request = std::min(requestBuffer, AuUInt32(ArraySize(din))); - AuUInt32 done{}; - while (done != readLength) + AuUInt32 done{}, read{}; + while (read != input) { - if (this->_reader->Read(din, request) != IO::EStreamError::eErrorNone) + AuUInt32 request = std::min(input, AuUInt32(ArraySize(din_))); + if (this->_reader->Read(din_, request) != IO::EStreamError::eErrorNone) { - return minimum <= done; + return std::make_pair(read, done); } - - done += request; + read += request; this->_ctx.avail_in = request; - this->_ctx.next_in = reinterpret_cast(din); + this->_ctx.next_in = reinterpret_cast(din_); do { - this->_ctx.avail_out = ArraySize(dout); - this->_ctx.next_out = dout; + this->_ctx.avail_out = ArraySize(dout_); + this->_ctx.next_out = dout_; ret = BZ2_bzDecompress(&this->_ctx); if (ret != Z_OK) { SysPushErrorIO("Error: {}", ret); - return false; + return std::make_pair(read, 0); } - auto have = ArraySize(dout) - this->_ctx.avail_out; + auto have = ArraySize(dout_) - this->_ctx.avail_out; + done += have; - this->_outbuffer.insert(this->_outbuffer.end(), reinterpret_cast(dout), reinterpret_cast(dout) + have); + this->_outbuffer.insert(this->_outbuffer.end(), + reinterpret_cast(dout_), + reinterpret_cast(dout_) + have); } while (this->_ctx.avail_out == 0); SysAssert(this->_ctx.avail_in == 0); } - return true; + return std::make_pair(read, done); } private: @@ -264,6 +267,8 @@ namespace Aurora::Compression Aurora::IO::IStreamReader *_reader; bz_stream _ctx {}; bool _init {}; + char dout_[4096]; + char din_[4096]; }; @@ -293,58 +298,45 @@ namespace Aurora::Compression return true; } - bool Ingest(AuUInt32 minimum, AuUInt32 requestBuffer) override + std::pair Ingest(AuUInt32 input) override { - int ret; - char dout[4096]; - char din[4096]; - - AuUInt32 readLength = requestBuffer; - AuUInt32 request = std::min(requestBuffer, AuUInt32(ArraySize(din))); - AuUInt32 done{}; + AuUInt32 done {}, read {}; + std::shared_ptr pinShared; - while (done != readLength) + while (read != input) { - if (this->_reader->Read(din, request) != IO::EStreamError::eErrorNone) + AuUInt16 frameSize; + AuUInt32 request = sizeof(frameSize); + + if (this->_reader->Read(&frameSize, request) != IO::EStreamError::eErrorNone) { - return minimum <= done; + return std::make_pair(read, done); + } + read += request; + + std::shared_ptr inFrame(new char[frameSize], std::default_delete()); + std::shared_ptr outFrame(new char[frameSize], std::default_delete()); + if (this->_reader->Read(inFrame.get(), request) != IO::EStreamError::eErrorNone) + { + return std::make_pair(read, done); + } + read += request; + + auto bytes = LZ4_decompress_safe_continue(_lz4Stream, inFrame.get(), outFrame.get(), frameSize, frameSize); + if (bytes <= 0) + { + return std::make_pair(read, 0); } - done += request; + done += bytes; + pinShared = std::move(inFrame); - - int framesLength = done; - char *framesPointer = (char *)din; - while (true) - { - if (framesLength < 2) - { - return false; - } - framesLength -= 2; - - auto frameLength = *reinterpret_cast(framesPointer); - if (frameLength > framesLength) - { - return false; - } - framesPointer += 2; - - auto err = LZ4_decompress_safe_continue(_lz4Stream, framesPointer, dout, frameLength, ArraySize(dout)); - if (err < 0) - { - ret = false; - return false; - } - - this->_outbuffer.insert(this->_outbuffer.end(), reinterpret_cast(dout), reinterpret_cast(dout) + err); - - framesPointer += framesLength; - framesLength -= framesLength; - } + this->_outbuffer.insert(this->_outbuffer.end(), + reinterpret_cast(outFrame.get()), + reinterpret_cast(outFrame.get()) + bytes); } - - return true; + + return std::make_pair(read, done); } private: @@ -381,8 +373,8 @@ namespace Aurora::Compression { if (!ret->Init(reader)) { - ret = nullptr; delete ret; + ret = nullptr; } } diff --git a/Source/Compression/StreamCompression.cpp b/Source/Compression/StreamCompression.cpp index 706255ce..9416f0c7 100644 --- a/Source/Compression/StreamCompression.cpp +++ b/Source/Compression/StreamCompression.cpp @@ -431,16 +431,21 @@ namespace Aurora::Compression { bool ret = true; LZ4_stream_t* const lz4Stream = LZ4_createStream(); - char inBuf[kChunkSize]; - char outBuf[kChunkSize]; + + int size = info.lz4BlockSize ? info.lz4BlockSize : 64 * 1024; + + std::shared_ptr inBuf(new char[size], std::default_delete()); + std::shared_ptr outBuf(new char[size], std::default_delete()); + + AuUInt32 inputStat = 0, outputStat = 0; while (true) { - auto read = stream.inPipe(inBuf, ArraySize(inBuf)); + auto read = stream.inPipe(inBuf.get() , 64 * 1024); if (!read) break; - AuUInt16 bufferedBytes = LZ4_compress_fast_continue(lz4Stream, inBuf, outBuf, read, ArraySize(outBuf), 1); + AuUInt16 bufferedBytes = LZ4_compress_fast_continue(lz4Stream, inBuf.get(), outBuf.get(), read, size, 1); if (bufferedBytes <= 0) { @@ -449,7 +454,7 @@ namespace Aurora::Compression } stream.writePipe(&bufferedBytes, sizeof(bufferedBytes)); - stream.writePipe(outBuf, bufferedBytes); + stream.writePipe(outBuf.get(), bufferedBytes); inputStat += read; outputStat += bufferedBytes;