diff --git a/Include/Aurora/Compression/ICompressionStream.hpp b/Include/Aurora/Compression/ICompressionStream.hpp index f773c556..902bb336 100644 --- a/Include/Aurora/Compression/ICompressionStream.hpp +++ b/Include/Aurora/Compression/ICompressionStream.hpp @@ -12,11 +12,14 @@ namespace Aurora::Compression class ICompressionStream { public: - virtual AuStreamReadWrittenPair_t Ingest(AuUInt32 bytesFromInputSource) = 0; + virtual AuStreamReadWrittenPair_t Ingest(AuUInt32 bytesFromUnprocessedInputSource) = 0; + // You should probably check this if you don't want to be DDoS'd virtual AuUInt32 GetInternalBufferSize() = 0; - virtual bool ReadByProcessedN(void * /*opt*/, AuUInt32 minimumProcessed, AuStreamReadWrittenPair_t &pair, bool ingestUntilEOS = true) = 0; - virtual bool GoBackByProcessedN(AuUInt32 offset) = 0; + + // Limited stream API + virtual bool ReadByProcessedN (void * /*opt*/, AuUInt32 minimumProcessed, AuStreamReadWrittenPair_t &pair, bool ingestUntilEOS = true) = 0; + virtual bool GoBackByProcessedN (AuUInt32 offset) = 0; virtual bool GoForwardByProcessedN(AuUInt32 offset) = 0; }; } \ No newline at end of file diff --git a/Include/Aurora/IO/Buffered/BlobReader.hpp b/Include/Aurora/IO/Buffered/BlobReader.hpp index 83314868..07918c2c 100644 --- a/Include/Aurora/IO/Buffered/BlobReader.hpp +++ b/Include/Aurora/IO/Buffered/BlobReader.hpp @@ -27,12 +27,12 @@ namespace Aurora::IO::Buffered { auto oldLen = std::exchange(len, 0); - if (buffer_.empty()) return EStreamError::eErrorEndOfStream; - - auto realEndOffset = std::min(AuUInt32(buffer_.size()), oldLen); + auto realEndOffset = std::min(AuUInt32(buffer_.size()) - offset_, oldLen); + if (realEndOffset == 0) return EStreamError::eErrorEndOfStream; len = realEndOffset; - std::memcpy(buffer, buffer_.data(), len); + std::memcpy(buffer, buffer_.data() + offset_, len); + offset_ += realEndOffset; return EStreamError::eErrorNone; } @@ -44,5 +44,6 @@ namespace Aurora::IO::Buffered private: AuList buffer_; + AuUInt32 offset_ {}; }; } \ No newline at end of file diff --git a/Source/Compression/BlockDecompressor.cpp b/Source/Compression/BlockDecompressor.cpp index 0510dc2f..614546d6 100644 --- a/Source/Compression/BlockDecompressor.cpp +++ b/Source/Compression/BlockDecompressor.cpp @@ -123,7 +123,7 @@ namespace Aurora::Compression AuUInt32 outFrameLength = ZSTD_DStreamOutSize(); AuUInt32 done{}, read{}; - while (read != input || userBound_) + while (read != input) { AuUInt32 request = std::min(input, length); if (this->reader_->Read(din_, request) != IO::EStreamError::eErrorNone) @@ -136,7 +136,6 @@ namespace Aurora::Compression while (input_.pos < input_.size) { - auto maxWrite = std::min(outFrameLength, AuUInt32(this->_outbuffer.RemainingWrite())); ZSTD_outBuffer output = { dout_, outFrameLength, 0 }; auto ret = ZSTD_decompressStream(this->dctx_, &output, &input_); @@ -154,8 +153,6 @@ namespace Aurora::Compression return {}; } } - - this->userBound_ = input_.pos != input_.size; } return AuMakePair(read, done); @@ -165,7 +162,6 @@ namespace Aurora::Compression Aurora::IO::IStreamReader *reader_; ZSTD_DCtx *dctx_; - bool userBound_ {}; char din_[ZSTD_BLOCKSIZE_MAX + 3 /*ZSTD_BLOCKHEADERSIZE*/]; char dout_[ZSTD_BLOCKSIZE_MAX]; ZSTD_inBuffer input_; @@ -203,7 +199,7 @@ namespace Aurora::Compression AuUInt32 done{}, read{}; - while (read < input || userBound_) + while (read < input) { AuUInt32 request = std::min(input, AuUInt32(AuArraySize(din_))); if (this->reader_->Read(din_, request) != IO::EStreamError::eErrorNone) @@ -243,8 +239,6 @@ namespace Aurora::Compression } } while (this->ctx_.avail_out == 0); - - //this->userBound_ = this->ctx_.avail_in != 0; } return AuMakePair(read, done); @@ -257,7 +251,6 @@ namespace Aurora::Compression bool init_ {}; unsigned char din_[4096]; unsigned char dout_[4096]; - bool userBound_ {}; }; class BZIPInflate : public BaseStream @@ -325,8 +318,6 @@ namespace Aurora::Compression } } while (this->ctx_.avail_out == 0); - - //this->userBound_ = this->ctx_.avail_in != 0; } return AuMakePair(read, done); diff --git a/Source/Compression/StreamCompression.cpp b/Source/Compression/StreamCompression.cpp index d4875ace..2d8abf05 100644 --- a/Source/Compression/StreamCompression.cpp +++ b/Source/Compression/StreamCompression.cpp @@ -235,10 +235,13 @@ namespace Aurora::Compression stream.writePipe(out, have); outputStat += have; - if (!stream.reportProgress(inputStat, outputStat)) + if (stream.reportProgress) { - deflateEnd(&strm); - return false; + if (!stream.reportProgress(inputStat, outputStat)) + { + deflateEnd(&strm); + return false; + } } } while (strm.avail_out == 0); @@ -295,10 +298,13 @@ namespace Aurora::Compression stream.writePipe(out, have); outputStat += have; - if (!stream.reportProgress(inputStat, outputStat)) + if (stream.reportProgress) { - inflateEnd(&strm); - return false; + if (!stream.reportProgress(inputStat, outputStat)) + { + inflateEnd(&strm); + return false; + } } } while (strm.avail_out == 0); @@ -353,10 +359,14 @@ namespace Aurora::Compression stream.writePipe(out, have); outputStat += have; - if (!stream.reportProgress(inputStat, outputStat)) + + if (stream.reportProgress) { - BZ2_bzCompressEnd(&strm); - return false; + if (!stream.reportProgress(inputStat, outputStat)) + { + BZ2_bzCompressEnd(&strm); + return false; + } } } while (strm.avail_out == 0); @@ -413,10 +423,13 @@ namespace Aurora::Compression stream.writePipe(out, have); outputStat += have; - if (!stream.reportProgress(inputStat, outputStat)) + if (stream.reportProgress) { - BZ2_bzDecompressEnd(&strm); - return false; + if (!stream.reportProgress(inputStat, outputStat)) + { + BZ2_bzDecompressEnd(&strm); + return false; + } } } while (strm.avail_out == 0); @@ -460,25 +473,36 @@ namespace Aurora::Compression while (true) { auto read = stream.inPipe(buffer.get(), maxFrameSize); - if (!read) break; + inputStat += read; - AuUInt32 bufferedBytes = LZ4F_compressUpdate(cctxPtr, outBuffer.get(), maxOut, buffer.get(), read, &options); - - if (bufferedBytes <= 0) + if (read) { - ret = false; + AuUInt32 bufferedBytes = LZ4F_compressUpdate(cctxPtr, outBuffer.get(), maxOut, buffer.get(), read, &options); + + if (LZ4F_isError(bufferedBytes)) + { + ret = false; + break; + } + + stream.writePipe(outBuffer.get(), bufferedBytes); + + outputStat += bufferedBytes; + } + else + { + AuUInt32 bufferedBytes = LZ4F_compressEnd(cctxPtr, outBuffer.get(), maxOut, &options); + stream.writePipe(outBuffer.get(), bufferedBytes); break; } - - stream.writePipe(outBuffer.get(), bufferedBytes); - inputStat += read; - outputStat += bufferedBytes; - - if (!stream.reportProgress(inputStat, outputStat)) + if (stream.reportProgress) { - ret = false; - break; + if (!stream.reportProgress(inputStat, outputStat)) + { + ret = false; + break; + } } } diff --git a/Source/Console/Flusher.cpp b/Source/Console/Flusher.cpp index b06848b6..5ef5762e 100644 --- a/Source/Console/Flusher.cpp +++ b/Source/Console/Flusher.cpp @@ -44,7 +44,7 @@ namespace Aurora::Console while (!thread->Exiting()) { - Sleep(500); + Threading::Sleep(500); ForceFlush(); } } @@ -56,7 +56,6 @@ namespace Aurora::Console static void InitFlushThread() { - // Startup a runner thread that will take care of all the stress inducing IO every so often on a remote thread Threading::Threads::AbstractThreadVectors handler; handler.DoRun = [](Threading::Threads::IAuroraThread *)