[*] Starting to fix up rushed block compressors

This commit is contained in:
Reece Wilson 2021-07-11 11:50:57 +01:00
parent 718375b236
commit dd06417d04
5 changed files with 106 additions and 106 deletions

View File

@ -30,5 +30,8 @@ namespace Aurora::Compression
/// (1 << 12) <= dictSize <= (3 << 29) for 64-bit version
/// default = (1 << 24)
AuUInt32 dictSize{};
// 64KiB is a recommended "small" block size
AuUInt16 lz4BlockSize {};
};
}

View File

@ -12,7 +12,7 @@ namespace Aurora::Compression
class ICompressionStream
{
public:
virtual bool Ingest(AuUInt32 minimum, AuUInt32 request) = 0;
virtual std::pair<AuUInt32, AuUInt32> Ingest(AuUInt32 input) = 0;
virtual bool Read(void * /*opt*/, AuUInt32 &len, bool ingestUntilEOS = true) = 0;
};

View File

@ -22,7 +22,7 @@ namespace Aurora::Compression
{
while (this->_outbuffer.size() < len)
{
if (!Ingest(0, len))
if (Ingest(4096).second == 0)
{
return false;
}

View File

@ -22,7 +22,7 @@ namespace Aurora::Compression
{
while (this->_outbuffer.size() < len)
{
if (!Ingest(0, len))
if (Ingest(4096).second == 0)
{
return false;
}
@ -59,27 +59,22 @@ namespace Aurora::Compression
return true;
}
bool Ingest(AuUInt32 minimum, AuUInt32 requestBuffer) override
std::pair<AuUInt32, AuUInt32> Ingest(AuUInt32 input) override
{
AuUInt32 length = ZSTD_DStreamInSize();
void *din = alloca(length);
AuUInt32 readLength = requestBuffer;
AuUInt32 requested = std::min(readLength, length);
AuUInt32 request = requested;
AuUInt32 done{};
auto outFrameLength = ZSTD_DStreamOutSize();
void *dout = alloca(outFrameLength);
AuUInt32 done{}, read{};
while (done != readLength)
while (read != input)
{
AuUInt32 request = std::min(input, length);
if (this->_reader->Read(din, request) != IO::EStreamError::eErrorNone)
{
return minimum <= done;
return std::make_pair(read, done);
}
done += request;
read += request;
ZSTD_inBuffer input = { din, request, 0 };
while (input.pos < input.size)
@ -90,14 +85,17 @@ namespace Aurora::Compression
if (ZSTD_isError(ret))
{
SysPushErrorIO("Compression error: {}", ret);
return false;
return std::make_pair(read, 0);
}
this->_outbuffer.insert(this->_outbuffer.end(), reinterpret_cast<const AuUInt8 *>(output.dst), reinterpret_cast<const AuUInt8 *>(output.dst) + output.pos);
done += output.pos;
this->_outbuffer.insert(this->_outbuffer.end(),
reinterpret_cast<const AuUInt8 *>(output.dst),
reinterpret_cast<const AuUInt8 *>(output.dst) + output.pos);
}
}
return true;
return std::make_pair(read, done);
}
private:
@ -133,49 +131,54 @@ namespace Aurora::Compression
return true;
}
bool Ingest(AuUInt32 minimum, AuUInt32 requestBuffer) override
std::pair<AuUInt32, AuUInt32> Ingest(AuUInt32 input) override
{
int ret;
AuUInt32 readLength = requestBuffer;
unsigned char dout[4096];
unsigned char din[4096];
AuUInt32 request = std::min(requestBuffer, AuUInt32(ArraySize(din)));
AuUInt32 done{}, read{};
AuUInt32 done{};
while (done != readLength)
while (read != input)
{
if (this->_reader->Read(din, request) != IO::EStreamError::eErrorNone)
AuUInt32 request = std::min(input, AuUInt32(ArraySize(din_)));
if (this->_reader->Read(din_, request) != IO::EStreamError::eErrorNone)
{
return minimum <= done;
return std::make_pair(read, done);
}
done += request;
read += request;
this->_ctx.avail_in = request;
this->_ctx.next_in = reinterpret_cast<unsigned char *>(din);
this->_ctx.next_in = reinterpret_cast<unsigned char *>(din_);
do
{
this->_ctx.avail_out = ArraySize(dout);
this->_ctx.next_out = dout;
this->_ctx.avail_out = ArraySize(dout_);
this->_ctx.next_out = dout_;
if (!this->_ctx.avail_out)
{
break;
}
ret = inflate(&this->_ctx, Z_NO_FLUSH);
if (ret != Z_OK)
{
SysPushErrorIO("Error: {}", ret);
return false;
return std::make_pair(read, 0);
}
auto have = ArraySize(dout) - this->_ctx.avail_out;
this->_outbuffer.insert(this->_outbuffer.end(), reinterpret_cast<const AuUInt8 *>(dout), reinterpret_cast<const AuUInt8 *>(dout) + have);
auto have = ArraySize(dout_) - this->_ctx.avail_out;
done += have;
this->_outbuffer.insert(this->_outbuffer.end(),
reinterpret_cast<const AuUInt8 *>(dout_),
reinterpret_cast<const AuUInt8 *>(dout_) + have);
} while (this->_ctx.avail_out == 0);
SysAssert(this->_ctx.avail_in == 0);
}
return true;
return std::make_pair(read, done);
}
private:
@ -183,6 +186,8 @@ namespace Aurora::Compression
Aurora::IO::IStreamReader *_reader;
z_stream _ctx {};
bool _init {};
unsigned char din_[4096];
unsigned char dout_[4096];
};
@ -213,50 +218,48 @@ namespace Aurora::Compression
return true;
}
bool Ingest(AuUInt32 minimum, AuUInt32 requestBuffer) override
std::pair<AuUInt32, AuUInt32> Ingest(AuUInt32 input) override
{
int ret;
char dout[4096];
char din[4096];
AuUInt32 readLength = requestBuffer;
AuUInt32 request = std::min(requestBuffer, AuUInt32(ArraySize(din)));
AuUInt32 done{};
while (done != readLength)
AuUInt32 done{}, read{};
while (read != input)
{
if (this->_reader->Read(din, request) != IO::EStreamError::eErrorNone)
AuUInt32 request = std::min(input, AuUInt32(ArraySize(din_)));
if (this->_reader->Read(din_, request) != IO::EStreamError::eErrorNone)
{
return minimum <= done;
return std::make_pair(read, done);
}
done += request;
read += request;
this->_ctx.avail_in = request;
this->_ctx.next_in = reinterpret_cast<char *>(din);
this->_ctx.next_in = reinterpret_cast<char *>(din_);
do
{
this->_ctx.avail_out = ArraySize(dout);
this->_ctx.next_out = dout;
this->_ctx.avail_out = ArraySize(dout_);
this->_ctx.next_out = dout_;
ret = BZ2_bzDecompress(&this->_ctx);
if (ret != Z_OK)
{
SysPushErrorIO("Error: {}", ret);
return false;
return std::make_pair(read, 0);
}
auto have = ArraySize(dout) - this->_ctx.avail_out;
auto have = ArraySize(dout_) - this->_ctx.avail_out;
done += have;
this->_outbuffer.insert(this->_outbuffer.end(), reinterpret_cast<const AuUInt8 *>(dout), reinterpret_cast<const AuUInt8 *>(dout) + have);
this->_outbuffer.insert(this->_outbuffer.end(),
reinterpret_cast<const AuUInt8 *>(dout_),
reinterpret_cast<const AuUInt8 *>(dout_) + have);
} while (this->_ctx.avail_out == 0);
SysAssert(this->_ctx.avail_in == 0);
}
return true;
return std::make_pair(read, done);
}
private:
@ -264,6 +267,8 @@ namespace Aurora::Compression
Aurora::IO::IStreamReader *_reader;
bz_stream _ctx {};
bool _init {};
char dout_[4096];
char din_[4096];
};
@ -293,58 +298,45 @@ namespace Aurora::Compression
return true;
}
bool Ingest(AuUInt32 minimum, AuUInt32 requestBuffer) override
std::pair<AuUInt32, AuUInt32> Ingest(AuUInt32 input) override
{
int ret;
char dout[4096];
char din[4096];
AuUInt32 readLength = requestBuffer;
AuUInt32 request = std::min(requestBuffer, AuUInt32(ArraySize(din)));
AuUInt32 done{};
AuUInt32 done {}, read {};
std::shared_ptr<char> pinShared;
while (done != readLength)
while (read != input)
{
if (this->_reader->Read(din, request) != IO::EStreamError::eErrorNone)
AuUInt16 frameSize;
AuUInt32 request = sizeof(frameSize);
if (this->_reader->Read(&frameSize, request) != IO::EStreamError::eErrorNone)
{
return minimum <= done;
return std::make_pair(read, done);
}
read += request;
std::shared_ptr<char> inFrame(new char[frameSize], std::default_delete<char[]>());
std::shared_ptr<char> outFrame(new char[frameSize], std::default_delete<char[]>());
if (this->_reader->Read(inFrame.get(), request) != IO::EStreamError::eErrorNone)
{
return std::make_pair(read, done);
}
read += request;
auto bytes = LZ4_decompress_safe_continue(_lz4Stream, inFrame.get(), outFrame.get(), frameSize, frameSize);
if (bytes <= 0)
{
return std::make_pair(read, 0);
}
done += request;
done += bytes;
pinShared = std::move(inFrame);
int framesLength = done;
char *framesPointer = (char *)din;
while (true)
{
if (framesLength < 2)
{
return false;
}
framesLength -= 2;
auto frameLength = *reinterpret_cast<AuUInt16 *>(framesPointer);
if (frameLength > framesLength)
{
return false;
}
framesPointer += 2;
auto err = LZ4_decompress_safe_continue(_lz4Stream, framesPointer, dout, frameLength, ArraySize(dout));
if (err < 0)
{
ret = false;
return false;
}
this->_outbuffer.insert(this->_outbuffer.end(), reinterpret_cast<const AuUInt8 *>(dout), reinterpret_cast<const AuUInt8 *>(dout) + err);
framesPointer += framesLength;
framesLength -= framesLength;
}
this->_outbuffer.insert(this->_outbuffer.end(),
reinterpret_cast<const AuUInt8 *>(outFrame.get()),
reinterpret_cast<const AuUInt8 *>(outFrame.get()) + bytes);
}
return true;
return std::make_pair(read, done);
}
private:
@ -381,8 +373,8 @@ namespace Aurora::Compression
{
if (!ret->Init(reader))
{
ret = nullptr;
delete ret;
ret = nullptr;
}
}

View File

@ -431,16 +431,21 @@ namespace Aurora::Compression
{
bool ret = true;
LZ4_stream_t* const lz4Stream = LZ4_createStream();
char inBuf[kChunkSize];
char outBuf[kChunkSize];
int size = info.lz4BlockSize ? info.lz4BlockSize : 64 * 1024;
std::shared_ptr<char> inBuf(new char[size], std::default_delete<char[]>());
std::shared_ptr<char> outBuf(new char[size], std::default_delete<char[]>());
AuUInt32 inputStat = 0, outputStat = 0;
while (true)
{
auto read = stream.inPipe(inBuf, ArraySize(inBuf));
auto read = stream.inPipe(inBuf.get() , 64 * 1024);
if (!read) break;
AuUInt16 bufferedBytes = LZ4_compress_fast_continue(lz4Stream, inBuf, outBuf, read, ArraySize(outBuf), 1);
AuUInt16 bufferedBytes = LZ4_compress_fast_continue(lz4Stream, inBuf.get(), outBuf.get(), read, size, 1);
if (bufferedBytes <= 0)
{
@ -449,7 +454,7 @@ namespace Aurora::Compression
}
stream.writePipe(&bufferedBytes, sizeof(bufferedBytes));
stream.writePipe(outBuf, bufferedBytes);
stream.writePipe(outBuf.get(), bufferedBytes);
inputStat += read;
outputStat += bufferedBytes;