[*] lz4 and buffered stream bug fixes

This commit is contained in:
Reece Wilson 2021-09-06 16:47:35 +01:00
parent c4ef27b01d
commit 329abe97f4
5 changed files with 63 additions and 45 deletions

View File

@ -12,11 +12,14 @@ namespace Aurora::Compression
class ICompressionStream class ICompressionStream
{ {
public: public:
virtual AuStreamReadWrittenPair_t Ingest(AuUInt32 bytesFromInputSource) = 0; virtual AuStreamReadWrittenPair_t Ingest(AuUInt32 bytesFromUnprocessedInputSource) = 0;
// You should probably check this if you don't want to be DDoS'd // You should probably check this if you don't want to be DDoS'd
virtual AuUInt32 GetInternalBufferSize() = 0; virtual AuUInt32 GetInternalBufferSize() = 0;
virtual bool ReadByProcessedN(void * /*opt*/, AuUInt32 minimumProcessed, AuStreamReadWrittenPair_t &pair, bool ingestUntilEOS = true) = 0;
virtual bool GoBackByProcessedN(AuUInt32 offset) = 0; // Limited stream API
virtual bool ReadByProcessedN (void * /*opt*/, AuUInt32 minimumProcessed, AuStreamReadWrittenPair_t &pair, bool ingestUntilEOS = true) = 0;
virtual bool GoBackByProcessedN (AuUInt32 offset) = 0;
virtual bool GoForwardByProcessedN(AuUInt32 offset) = 0; virtual bool GoForwardByProcessedN(AuUInt32 offset) = 0;
}; };
} }

View File

@ -27,12 +27,12 @@ namespace Aurora::IO::Buffered
{ {
auto oldLen = std::exchange(len, 0); auto oldLen = std::exchange(len, 0);
if (buffer_.empty()) return EStreamError::eErrorEndOfStream; auto realEndOffset = std::min(AuUInt32(buffer_.size()) - offset_, oldLen);
if (realEndOffset == 0) return EStreamError::eErrorEndOfStream;
auto realEndOffset = std::min(AuUInt32(buffer_.size()), oldLen);
len = realEndOffset; len = realEndOffset;
std::memcpy(buffer, buffer_.data(), len); std::memcpy(buffer, buffer_.data() + offset_, len);
offset_ += realEndOffset;
return EStreamError::eErrorNone; return EStreamError::eErrorNone;
} }
@ -44,5 +44,6 @@ namespace Aurora::IO::Buffered
private: private:
AuList<AuUInt8> buffer_; AuList<AuUInt8> buffer_;
AuUInt32 offset_ {};
}; };
} }

View File

@ -123,7 +123,7 @@ namespace Aurora::Compression
AuUInt32 outFrameLength = ZSTD_DStreamOutSize(); AuUInt32 outFrameLength = ZSTD_DStreamOutSize();
AuUInt32 done{}, read{}; AuUInt32 done{}, read{};
while (read != input || userBound_) while (read != input)
{ {
AuUInt32 request = std::min(input, length); AuUInt32 request = std::min(input, length);
if (this->reader_->Read(din_, request) != IO::EStreamError::eErrorNone) if (this->reader_->Read(din_, request) != IO::EStreamError::eErrorNone)
@ -136,7 +136,6 @@ namespace Aurora::Compression
while (input_.pos < input_.size) while (input_.pos < input_.size)
{ {
auto maxWrite = std::min(outFrameLength, AuUInt32(this->_outbuffer.RemainingWrite()));
ZSTD_outBuffer output = { dout_, outFrameLength, 0 }; ZSTD_outBuffer output = { dout_, outFrameLength, 0 };
auto ret = ZSTD_decompressStream(this->dctx_, &output, &input_); auto ret = ZSTD_decompressStream(this->dctx_, &output, &input_);
@ -154,8 +153,6 @@ namespace Aurora::Compression
return {}; return {};
} }
} }
this->userBound_ = input_.pos != input_.size;
} }
return AuMakePair(read, done); return AuMakePair(read, done);
@ -165,7 +162,6 @@ namespace Aurora::Compression
Aurora::IO::IStreamReader *reader_; Aurora::IO::IStreamReader *reader_;
ZSTD_DCtx *dctx_; ZSTD_DCtx *dctx_;
bool userBound_ {};
char din_[ZSTD_BLOCKSIZE_MAX + 3 /*ZSTD_BLOCKHEADERSIZE*/]; char din_[ZSTD_BLOCKSIZE_MAX + 3 /*ZSTD_BLOCKHEADERSIZE*/];
char dout_[ZSTD_BLOCKSIZE_MAX]; char dout_[ZSTD_BLOCKSIZE_MAX];
ZSTD_inBuffer input_; ZSTD_inBuffer input_;
@ -203,7 +199,7 @@ namespace Aurora::Compression
AuUInt32 done{}, read{}; AuUInt32 done{}, read{};
while (read < input || userBound_) while (read < input)
{ {
AuUInt32 request = std::min(input, AuUInt32(AuArraySize(din_))); AuUInt32 request = std::min(input, AuUInt32(AuArraySize(din_)));
if (this->reader_->Read(din_, request) != IO::EStreamError::eErrorNone) if (this->reader_->Read(din_, request) != IO::EStreamError::eErrorNone)
@ -243,8 +239,6 @@ namespace Aurora::Compression
} }
} while (this->ctx_.avail_out == 0); } while (this->ctx_.avail_out == 0);
//this->userBound_ = this->ctx_.avail_in != 0;
} }
return AuMakePair(read, done); return AuMakePair(read, done);
@ -257,7 +251,6 @@ namespace Aurora::Compression
bool init_ {}; bool init_ {};
unsigned char din_[4096]; unsigned char din_[4096];
unsigned char dout_[4096]; unsigned char dout_[4096];
bool userBound_ {};
}; };
class BZIPInflate : public BaseStream class BZIPInflate : public BaseStream
@ -325,8 +318,6 @@ namespace Aurora::Compression
} }
} while (this->ctx_.avail_out == 0); } while (this->ctx_.avail_out == 0);
//this->userBound_ = this->ctx_.avail_in != 0;
} }
return AuMakePair(read, done); return AuMakePair(read, done);

View File

@ -235,10 +235,13 @@ namespace Aurora::Compression
stream.writePipe(out, have); stream.writePipe(out, have);
outputStat += have; outputStat += have;
if (!stream.reportProgress(inputStat, outputStat)) if (stream.reportProgress)
{ {
deflateEnd(&strm); if (!stream.reportProgress(inputStat, outputStat))
return false; {
deflateEnd(&strm);
return false;
}
} }
} while (strm.avail_out == 0); } while (strm.avail_out == 0);
@ -295,10 +298,13 @@ namespace Aurora::Compression
stream.writePipe(out, have); stream.writePipe(out, have);
outputStat += have; outputStat += have;
if (!stream.reportProgress(inputStat, outputStat)) if (stream.reportProgress)
{ {
inflateEnd(&strm); if (!stream.reportProgress(inputStat, outputStat))
return false; {
inflateEnd(&strm);
return false;
}
} }
} while (strm.avail_out == 0); } while (strm.avail_out == 0);
@ -353,10 +359,14 @@ namespace Aurora::Compression
stream.writePipe(out, have); stream.writePipe(out, have);
outputStat += have; outputStat += have;
if (!stream.reportProgress(inputStat, outputStat))
if (stream.reportProgress)
{ {
BZ2_bzCompressEnd(&strm); if (!stream.reportProgress(inputStat, outputStat))
return false; {
BZ2_bzCompressEnd(&strm);
return false;
}
} }
} while (strm.avail_out == 0); } while (strm.avail_out == 0);
@ -413,10 +423,13 @@ namespace Aurora::Compression
stream.writePipe(out, have); stream.writePipe(out, have);
outputStat += have; outputStat += have;
if (!stream.reportProgress(inputStat, outputStat)) if (stream.reportProgress)
{ {
BZ2_bzDecompressEnd(&strm); if (!stream.reportProgress(inputStat, outputStat))
return false; {
BZ2_bzDecompressEnd(&strm);
return false;
}
} }
} while (strm.avail_out == 0); } while (strm.avail_out == 0);
@ -460,25 +473,36 @@ namespace Aurora::Compression
while (true) while (true)
{ {
auto read = stream.inPipe(buffer.get(), maxFrameSize); auto read = stream.inPipe(buffer.get(), maxFrameSize);
if (!read) break; inputStat += read;
AuUInt32 bufferedBytes = LZ4F_compressUpdate(cctxPtr, outBuffer.get(), maxOut, buffer.get(), read, &options); if (read)
if (bufferedBytes <= 0)
{ {
ret = false; AuUInt32 bufferedBytes = LZ4F_compressUpdate(cctxPtr, outBuffer.get(), maxOut, buffer.get(), read, &options);
if (LZ4F_isError(bufferedBytes))
{
ret = false;
break;
}
stream.writePipe(outBuffer.get(), bufferedBytes);
outputStat += bufferedBytes;
}
else
{
AuUInt32 bufferedBytes = LZ4F_compressEnd(cctxPtr, outBuffer.get(), maxOut, &options);
stream.writePipe(outBuffer.get(), bufferedBytes);
break; break;
} }
stream.writePipe(outBuffer.get(), bufferedBytes);
inputStat += read; if (stream.reportProgress)
outputStat += bufferedBytes;
if (!stream.reportProgress(inputStat, outputStat))
{ {
ret = false; if (!stream.reportProgress(inputStat, outputStat))
break; {
ret = false;
break;
}
} }
} }

View File

@ -44,7 +44,7 @@ namespace Aurora::Console
while (!thread->Exiting()) while (!thread->Exiting())
{ {
Sleep(500); Threading::Sleep(500);
ForceFlush(); ForceFlush();
} }
} }
@ -56,7 +56,6 @@ namespace Aurora::Console
static void InitFlushThread() static void InitFlushThread()
{ {
// Startup a runner thread that will take care of all the stress inducing IO every so often on a remote thread // Startup a runner thread that will take care of all the stress inducing IO every so often on a remote thread
Threading::Threads::AbstractThreadVectors handler; Threading::Threads::AbstractThreadVectors handler;
handler.DoRun = [](Threading::Threads::IAuroraThread *) handler.DoRun = [](Threading::Threads::IAuroraThread *)