More compression work, still requires polish and clean up

This commit is contained in:
Reece Wilson 2021-09-07 08:37:15 +01:00
parent eb6a1eaad0
commit bef72f8e4a
10 changed files with 207 additions and 104 deletions

View File

@ -34,4 +34,10 @@ namespace Aurora::Compression
// 64KiB is a recommended "small" block size
AuUInt16 lz4BlockSize {};
};
struct DecompressInfo
{
ECompresionType alg;
AuUInt32 internalStreamSize;
};
}

View File

@ -19,6 +19,7 @@ namespace Aurora::Compression
// Limited stream API
virtual bool ReadByProcessedN (void * /*opt*/, AuUInt32 minimumProcessed, AuStreamReadWrittenPair_t &pair, bool ingestUntilEOS = true) = 0;
virtual bool ReadByProcessedN (void * /*opt*/, AuUInt32 minimumInflated) = 0;
virtual bool GoBackByProcessedN (AuUInt32 offset) = 0;
virtual bool GoForwardByProcessedN(AuUInt32 offset) = 0;
};

View File

@ -12,7 +12,6 @@ namespace Aurora::Compression
struct CompressionPipe
{
/// algorithm
ECompresionType type;
/// LZMA decompression + compression, and ZSTD compression only
AuUInt32 threads;
@ -27,6 +26,6 @@ namespace Aurora::Compression
std::function<bool(AuUInt, AuUInt)> reportProgress;
};
AUKN_SYM bool Decompress(const CompressionPipe &stream);
AUKN_SYM bool Decompress(const CompressionPipe &stream, const DecompressInfo &meta);
AUKN_SYM bool Compress(const CompressionPipe &stream, const CompressionInfo &info);
}

View File

@ -11,6 +11,6 @@
namespace Aurora::Compression
{
AUKN_SHARED_API(Decompressor, ICompressionStream, Aurora::IO::IStreamReader *reader, ECompresionType type);
AUKN_SHARED_API(Compressor, ICompressionStream, Aurora::IO::IStreamReader *reader, const CompressionInfo &info);
AUKN_SHARED_API(Decompressor, ICompressionStream, const AuSPtr<Aurora::IO::IStreamReader> &reader, const DecompressInfo &info);
AUKN_SHARED_API(Compressor, ICompressionStream, const AuSPtr<Aurora::IO::IStreamReader> &reader, const CompressionInfo &info);
}

View File

@ -13,10 +13,11 @@
#include "zstd.h"
#include "zlib.h"
#include "lz4.h"
#include "lz4frame.h"
namespace Aurora::Compression
{
bool BaseStream::ReadByProcessedN(void * buffer, AuUInt32 minimumInflated, AuStreamReadWrittenPair_t &pair, bool ingestUntilEOS)
bool BaseStream::ReadByProcessedN(void *buffer, AuUInt32 minimumInflated, AuStreamReadWrittenPair_t &pair, bool ingestUntilEOS)
{
AuUInt32 read {}, len {};
@ -38,11 +39,18 @@ namespace Aurora::Compression
}
}
len = this->_outbuffer.Read(buffer, len, buffer == nullptr);
len = this->_outbuffer.Read(buffer, minimumInflated, buffer == nullptr);
pair = {read, len};
return len != 0;
}
bool BaseStream::ReadByProcessedN(void *buffer, AuUInt32 minimumInflated)
{
AuUInt32 read {}, len {};
len = this->_outbuffer.Read(buffer, minimumInflated, buffer == nullptr);
return len != 0;
}
bool BaseStream::GoBackByProcessedN(AuUInt32 offset)
{
return this->_outbuffer.ReaderTryGoBack(offset);
@ -93,10 +101,10 @@ namespace Aurora::Compression
class ZSTDInflate : public BaseStream
{
public:
ZSTDInflate() : BaseStream()
{
AuUInt32 bufferSize;
}
ZSTDInflate(AuUInt32 bufferSize) : bufferSize(bufferSize), BaseStream(bufferSize)
{}
~ZSTDInflate()
{
@ -106,7 +114,7 @@ namespace Aurora::Compression
}
}
bool Init(Aurora::IO::IStreamReader *reader)
bool Init(const AuSPtr<Aurora::IO::IStreamReader> &reader)
{
this->reader_ = reader;
this->dctx_ = ZSTD_createDCtx();
@ -163,7 +171,7 @@ namespace Aurora::Compression
private:
Aurora::IO::IStreamReader *reader_;
AuSPtr<Aurora::IO::IStreamReader> reader_;
ZSTD_DCtx *dctx_;
char din_[ZSTD_BLOCKSIZE_MAX + 3 /*ZSTD_BLOCKHEADERSIZE*/];
char dout_[ZSTD_BLOCKSIZE_MAX];
@ -173,20 +181,25 @@ namespace Aurora::Compression
class ZIPInflate : public BaseStream
{
public:
AuUInt32 bufferSize;
ZIPInflate(AuUInt32 bufferSize) : bufferSize(bufferSize), BaseStream(bufferSize)
{}
~ZIPInflate()
{
if (auto ctx = std::exchange(this->init_, {}))
if (this->init_)
{
inflateEnd(&this->ctx_);
}
}
bool Init(Aurora::IO::IStreamReader *reader)
bool Init(const AuSPtr<Aurora::IO::IStreamReader> &reader)
{
this->reader_ = reader;
auto ret = inflateInit(&this->ctx_);
if (ret != Z_OK)
if (ret < Z_OK)
{
SysPushErrorMem("Error: {}", ret);
return false;
@ -226,7 +239,7 @@ namespace Aurora::Compression
}
ret = inflate(&this->ctx_, Z_NO_FLUSH);
if (ret != Z_OK)
if (ret < Z_OK)
{
SysPushErrorIO("Error: {}", ret);
return AuMakePair(read, 0);
@ -249,7 +262,7 @@ namespace Aurora::Compression
private:
Aurora::IO::IStreamReader *reader_;
AuSPtr<Aurora::IO::IStreamReader> reader_;
z_stream ctx_ {};
bool init_ {};
unsigned char din_[4096];
@ -259,20 +272,25 @@ namespace Aurora::Compression
class BZIPInflate : public BaseStream
{
public:
AuUInt32 bufferSize;
BZIPInflate(AuUInt32 bufferSize) : bufferSize(bufferSize), BaseStream(bufferSize)
{}
~BZIPInflate()
{
if (auto ctx = std::exchange(this->init_, {}))
if (this->init_)
{
BZ2_bzDecompressEnd(&this->ctx_);
}
}
bool Init(Aurora::IO::IStreamReader *reader)
bool Init(const AuSPtr<Aurora::IO::IStreamReader> &reader)
{
this->reader_ = reader;
auto ret = BZ2_bzDecompressInit(&this->ctx_, 0, 0);
if (ret != Z_OK)
if (ret < Z_OK)
{
SysPushErrorMem("Error: {}", ret);
return false;
@ -287,7 +305,7 @@ namespace Aurora::Compression
int ret;
AuUInt32 done{}, read{};
while (read < input || userBound_)
while (read < input)
{
AuUInt32 request = std::min(input, AuUInt32(AuArraySize(din_)));
if (this->reader_->Read(din_, request) != IO::EStreamError::eErrorNone)
@ -305,7 +323,7 @@ namespace Aurora::Compression
this->ctx_.next_out = dout_;
ret = BZ2_bzDecompress(&this->ctx_);
if (ret != Z_OK)
if (ret < Z_OK)
{
SysPushErrorIO("Error: {}", ret);
return AuMakePair(read, 0);
@ -328,7 +346,7 @@ namespace Aurora::Compression
private:
Aurora::IO::IStreamReader *reader_;
AuSPtr<Aurora::IO::IStreamReader> reader_;
bz_stream ctx_ {};
bool init_ {};
char dout_[4096];
@ -340,22 +358,27 @@ namespace Aurora::Compression
{
public:
LZ4Inflate() : BaseStream(64 * 1024 * 2)
AuUInt32 bufferSize;
LZ4Inflate(AuUInt32 bufferSize) : bufferSize(bufferSize), BaseStream(bufferSize / 2)
{}
~LZ4Inflate()
{
if (lz4Stream_)
{
LZ4F_freeDecompressionContext(lz4Stream_);
}
}
bool Init(Aurora::IO::IStreamReader *reader)
bool Init(const AuSPtr<Aurora::IO::IStreamReader> &reader)
{
this->reader_ = reader;
this->lz4Stream_ = LZ4_createStreamDecode();
if (!this->lz4Stream_)
auto err = LZ4F_createDecompressionContext(&lz4Stream_, LZ4F_getVersion());
if (LZ4F_isError(err))
{
SysPushErrorMem();
return false;
return {};
}
return true;
@ -364,32 +387,103 @@ namespace Aurora::Compression
AuStreamReadWrittenPair_t Ingest(AuUInt32 input) override
{
return {};
bool ret = true;
LZ4F_dctx *dctxPtr;
AuUInt32 inputStat = 0, outputStat = 0;
size_t bytesRemInFrame {};
LZ4F_decompressOptions_t opts {};
auto bufferIn = AuSPtr<char>(new char[bufferSize / 2], std::default_delete<char[]>());
auto bufferOut = AuSPtr<char>(new char[bufferSize / 2], std::default_delete<char[]>());
while (inputStat < input)
{
auto frameSize = bytesRemInFrame ? bytesRemInFrame : LZ4F_MIN_SIZE_TO_KNOW_HEADER_LENGTH;
auto min = frameSize;
if (min > (bufferSize / 2))
{
ret = false;
break;
}
if (frameSize)
{
AuUInt32 request = frameSize;
if ((input < (inputStat + request)) ||
(this->reader_->Read(bufferIn.get(), request) != IO::EStreamError::eErrorNone) ||
(request != frameSize))
{
ret = request == 0 && inputStat;
break;
}
inputStat += frameSize;
}
if (frameSize)
{
auto mustConsume = frameSize;
size_t frameSPtr = mustConsume;
size_t frameS2Ptr = bufferSize / 2;
bytesRemInFrame = LZ4F_decompress(lz4Stream_, bufferOut.get(), &frameS2Ptr, bufferIn.get(), &frameSPtr, &opts);
if (LZ4F_isError(bytesRemInFrame))
{
ret = false;
break;
}
if (frameS2Ptr)
{
if (!Write(bufferOut.get(), frameS2Ptr))
{
ret = false;
break;
}
}
outputStat += frameS2Ptr;
}
}
if (!ret)
{
return {};
}
return AuMakePair(inputStat, outputStat);
}
private:
Aurora::IO::IStreamReader *reader_;
LZ4_streamDecode_t* lz4Stream_ {};
AuSPtr<Aurora::IO::IStreamReader> reader_;
LZ4F_dctx* lz4Stream_ {};
};
AUKN_SYM ICompressionStream *DecompressorNew(IO::IStreamReader *reader, ECompresionType type)
AUKN_SYM ICompressionStream *DecompressorNew(const AuSPtr<Aurora::IO::IStreamReader> &reader, const DecompressInfo &ref)
{
DecompressInfo info = ref;
BaseStream * ret{};
switch (type)
if (!info.internalStreamSize)
{
info.internalStreamSize = 1024 * 64 * 2;
}
switch (info.alg)
{
case ECompresionType::eZSTD:
ret = new ZSTDInflate();
ret = new ZSTDInflate(info.internalStreamSize);
break;
case ECompresionType::eBZIP2:
ret = new BZIPInflate();
ret = new BZIPInflate(info.internalStreamSize);
break;
case ECompresionType::eLZ4:
ret = new LZ4Inflate();
ret = new LZ4Inflate(info.internalStreamSize);
break;
case ECompresionType::eDeflate:
ret = new ZIPInflate();
ret = new ZIPInflate(info.internalStreamSize);
break;
default:
ret = nullptr;

View File

@ -16,8 +16,9 @@ namespace Aurora::Compression
virtual ~BaseStream() {}
virtual bool Init(Aurora::IO::IStreamReader *reader) = 0;
virtual bool Init(const AuSPtr<Aurora::IO::IStreamReader> &reader) = 0;
virtual bool ReadByProcessedN(void * /*opt*/, AuUInt32 minimumInflated) override;
virtual bool ReadByProcessedN(void * /*opt*/, AuUInt32 minimumInflated, AuStreamReadWrittenPair_t &pair, bool ingestUntilEOS = true) override;
virtual bool GoBackByProcessedN(AuUInt32 offset) override;
virtual bool GoForwardByProcessedN(AuUInt32 offset) override;

View File

@ -231,7 +231,7 @@ namespace Aurora::Compression
}
ret = deflateInit(&strm, info.compressionLevel);
if (ret != Z_OK)
if (ret < Z_OK)
{
SysPushErrorIO("Error: {}", ret);
return false;
@ -255,7 +255,7 @@ namespace Aurora::Compression
strm.next_out = out;
ret = deflate(&strm, flush);
if (ret != Z_OK)
if (ret < Z_OK)
{
SysPushErrorIO("Error: {}", ret);
return false;
@ -302,7 +302,7 @@ namespace Aurora::Compression
}
ret = inflateInit(&strm);
if (ret != Z_OK)
if (ret < Z_OK)
{
SysPushErrorIO("Error: {}", ret);
return false;
@ -328,7 +328,7 @@ namespace Aurora::Compression
strm.next_out = out;
ret = inflate(&strm, Z_NO_FLUSH);
if (ret != Z_OK)
if (ret < Z_OK)
{
SysPushErrorIO("Error: {}", ret);
inflateEnd(&strm);
@ -376,7 +376,7 @@ namespace Aurora::Compression
}
ret = BZ2_bzCompressInit(&strm, info.compressionLevel, 0, 0);
if (ret != BZ_OK)
if (ret < BZ_OK)
{
SysPushErrorIO("Error: {}", ret);
return false;
@ -400,9 +400,10 @@ namespace Aurora::Compression
strm.next_out = out;
ret = BZ2_bzCompress(&strm, flush);
if (ret != BZ_OK)
if (ret < BZ_OK)
{
SysPushErrorIO("Error: {}", ret);
BZ2_bzCompressEnd(&strm);
return false;
}
@ -448,7 +449,7 @@ namespace Aurora::Compression
}
ret = BZ2_bzDecompressInit(&strm, 0, 0);
if (ret != Z_OK)
if (ret < Z_OK)
{
SysPushErrorIO("Error: {}", ret);
return false;
@ -474,9 +475,10 @@ namespace Aurora::Compression
strm.next_out = out;
ret = BZ2_bzDecompress(&strm);
if (ret != Z_OK)
if (ret < Z_OK)
{
SysPushErrorIO("Error: {}", ret);
BZ2_bzDecompressEnd(&strm);
return false;
}
@ -571,14 +573,14 @@ namespace Aurora::Compression
LZ4F_freeCompressionContext(cctxPtr);
return ret;
}
static bool DecompressLZ4(const CompressionPipe &pipe)
static bool DecompressLZ4(const CompressionPipe &pipe, AuUInt32 frameMaxSize)
{
bool ret = true;
LZ4F_dctx *dctxPtr;
AuUInt32 inputStat = 0, outputStat = 0;
size_t bytesRemInFrame {};
LZ4F_decompressOptions_t opts {};
AuUInt32 lastFrameMaxSize {};
AuSPtr<char> bufferIn, bufferOut;
if (!pipe.writePipe)
@ -596,67 +598,44 @@ namespace Aurora::Compression
{
return false;
}
bufferIn = AuSPtr<char>(new char[frameMaxSize / 2], std::default_delete<char[]>());
bufferOut = AuSPtr<char>(new char[frameMaxSize / 2], std::default_delete<char[]>());
while (true)
{
char header[LZ4F_HEADER_SIZE_MAX];
auto frameSize = bytesRemInFrame ? bytesRemInFrame : LZ4F_MIN_SIZE_TO_KNOW_HEADER_LENGTH;
auto min = frameSize;
LZ4F_frameInfo_t info {};
// Read header
if (pipe.inPipe(header, LZ4F_MIN_SIZE_TO_KNOW_HEADER_LENGTH) != LZ4F_MIN_SIZE_TO_KNOW_HEADER_LENGTH)
if (min > (frameMaxSize / 2))
{
return {};
ret = false;
break;
}
//
auto sizeOfheader = LZ4F_headerSize(header, LZ4F_MIN_SIZE_TO_KNOW_HEADER_LENGTH);
if (LZ4F_isError(sizeOfheader))
if (frameSize)
{
return {};
}
const auto req = sizeOfheader - LZ4F_MIN_SIZE_TO_KNOW_HEADER_LENGTH;
if (pipe.inPipe(header + LZ4F_MIN_SIZE_TO_KNOW_HEADER_LENGTH, req) != req)
{
return {};
}
inputStat += sizeOfheader;
size_t sizePtr = sizeOfheader;
auto status = LZ4F_getFrameInfo(dctxPtr, &info, header, &sizePtr);
if (sizePtr != sizeOfheader)
{
return {};
}
auto frameSize = status;
auto min = std::max(info.contentSize, frameSize);
if (min > lastFrameMaxSize)
{
lastFrameMaxSize = min;
if (lastFrameMaxSize > 64 * 1024 * 1024)
auto read = pipe.inPipe(bufferIn.get(), frameSize);
if (read != frameSize)
{
return {};
ret = read == 0 && outputStat;
break;
}
bufferIn = AuSPtr<char>(new char[lastFrameMaxSize], std::default_delete<char[]>());
bufferOut = AuSPtr<char>(new char[lastFrameMaxSize], std::default_delete<char[]>());
inputStat += frameSize;
}
if (pipe.inPipe(bufferIn.get(), frameSize) != frameSize)
if (frameSize)
{
return {};
}
inputStat += frameSize;
if (info.contentSize)
{
size_t frameSPtr = frameSize;
size_t frameS2Ptr = info.contentSize;
LZ4F_decompress(dctxPtr, bufferIn.get(), &frameSPtr, bufferOut.get(), &frameS2Ptr, &opts);
auto mustConsume = frameSize;
size_t frameSPtr = mustConsume;
size_t frameS2Ptr = frameMaxSize;
bytesRemInFrame = LZ4F_decompress(dctxPtr, bufferOut.get(), &frameS2Ptr, bufferIn.get(), &frameSPtr, &opts);
if (LZ4F_isError(bytesRemInFrame))
{
ret = false;
break;
}
if (frameS2Ptr)
{
pipe.writePipe(bufferOut.get(), frameS2Ptr);
@ -664,10 +643,13 @@ namespace Aurora::Compression
outputStat += frameS2Ptr;
if (!pipe.reportProgress(inputStat, outputStat))
if (pipe.reportProgress)
{
ret = false;
break;
if (!pipe.reportProgress(inputStat, outputStat))
{
ret = false;
break;
}
}
}
}
@ -695,9 +677,14 @@ namespace Aurora::Compression
}
}
AUKN_SYM bool Decompress(const CompressionPipe &pipe)
AUKN_SYM bool Decompress( const CompressionPipe &pipe, const DecompressInfo &meta2)
{
switch (pipe.type)
DecompressInfo meta = meta2;
if (!meta.internalStreamSize)
{
meta.internalStreamSize = 1024 * 64 * 2;
}
switch (meta.alg)
{
case ECompresionType::eZSTD:
return DecompressZSTD(pipe);
@ -706,7 +693,7 @@ namespace Aurora::Compression
case ECompresionType::eBZIP2:
return DecompressBZip2(pipe);
case ECompresionType::eLZ4:
return DecompressLZ4(pipe);
return DecompressLZ4(pipe, meta.internalStreamSize);
//case ECompresionType::eLZMA:
// return DecompressLZMA(pipe);
default:

View File

@ -68,7 +68,7 @@ namespace Aurora::Console::Commands
offset = 0;
Parse::ParseState consumable(Parse::StringToConsumable(cmdParse, offset));
auto status = Parse::Parse(consumable, cmdEntry.commandStructure, res);
auto status = Parse::Parse(consumable, cmdEntry.commandStructure, res);
if (!status)
{

View File

@ -205,13 +205,28 @@ namespace Aurora::Debug
AUKN_SYM void _PushError(AuUInt address, FailureCategory category, const char *msg)
{
LastError error{ address, category, msg ? msg : "" };
// Oi, developer
#if defined(DEBUG)
DebugBreak();
#endif
// Cry about it to telemetry with other errors if available
AuUInt32 rng = GetFenceId();
Telemetry::InsertManualFence(rng);
Telemetry::InsertMsgError(error);
TryGetOrFetchCError();
TryGetOrFetchOSError();
Telemetry::InsertManualFence(rng);
// Is anyone listening?
// Print to console if internal
// Eh, dont spam nested to a logger if its not going to make sense to a normie
if ((category == FailureCategory::kFailureNested) && (msg == nullptr))
{
return;
}
#if defined(DEBUG) || defined(STAGING)
PrintError();

View File

@ -21,7 +21,7 @@ namespace Aurora::Locale::Encoding
TextStreamDecoderImpl(ECodePage page = ECodePage::eSysUnk) : defaultPage(page) {}
using TypeIn_t = std::conditional_t<optimized, void *, const void *>;
using TypeIn_t = std::conditional_t<optimized, void *, const void *>;
using TypeCast_t = std::conditional_t<optimized, AuUInt8 *, const AuUInt8 *>;