/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuBaseStream.cpp Date: 2022-2-14 Author: Reece ***/ #include #include "AuCompression.hpp" #include "AuIngestableReadBase.hpp" #include "AuBaseStream.hpp" namespace Aurora::Compression { AuStreamReadWrittenPair_t BaseStream::ReadEx(const AuMemoryViewWrite &/*optional/nullable*/destination, bool bIngestUntilEOS) { AU_LOCK_GUARD(this->_spinlock); AuUInt32 dwRead {}, dwBytesWritten {}; if (!destination.length && !destination.ptr) { return {0, this->pOutputBuffer_->RemainingBytes()}; } if (bIngestUntilEOS) { while (this->pOutputBuffer_->RemainingBytes() < destination.length) { auto toRead = destination.length ? AuUInt32(destination.length - this->pOutputBuffer_->RemainingBytes()) : 10 * 1024; // TODO: I was trying to get out of stream memory to explode less with real code. bIngestUntilEOS users are usually lazy decompressors // This was just to give a chance to yield, because theres no API to process after end of write condition; we always ended up failing. // ... // I think this rate limiter can be removed. Something else was the primary factor. toRead = AuMin(toRead, 1024 * 10); auto realPair = Ingest_s(toRead); dwRead += realPair.first; if (realPair.second == 0) { //if (!this->pOutputBuffer_->RemainingBytes(true)) //{ // return {}; //} break; } } } dwBytesWritten = this->pOutputBuffer_->Read(destination.ptr, destination.length, destination.ptr == nullptr); return {dwRead, dwBytesWritten}; } AuUInt32 BaseStream::GetAvailableProcessedBytes() { AU_LOCK_GUARD(this->_spinlock); return this->pOutputBuffer_->RemainingBytes(true); } AuUInt32 BaseStream::Read(const AuMemoryViewWrite & /*opt*/ destination) { AU_LOCK_GUARD(this->_spinlock); if (!destination.length && !destination.ptr) { return this->pOutputBuffer_->RemainingBytes(); } return this->pOutputBuffer_->Read(destination.ptr, destination.length, destination.ptr == nullptr); } bool BaseStream::GoBackByProcessedN(AuUInt32 dwOffset) { AU_LOCK_GUARD(this->_spinlock); return this->pOutputBuffer_->ReaderTryGoBack(dwOffset); } bool BaseStream::GoForwardByProcessedN(AuUInt32 dwOffset) { AU_LOCK_GUARD(this->_spinlock); if (!dwOffset) { return true; } return this->pOutputBuffer_->ReaderTryGoForward(dwOffset); } AuStreamReadWrittenPair_t BaseStream::Ingest(AuUInt32 dwBytesFromUnprocessedInputSource) { AU_LOCK_GUARD(this->_spinlock); if (!dwBytesFromUnprocessedInputSource) { return {}; } return Ingest_s(dwBytesFromUnprocessedInputSource); } bool BaseStream::IsValid() { return this->uBufferSize_ ? (this->pOutputBuffer_ && this->pOutputBuffer_->IsValid()) : true; } AuSPtr BaseStream::GetBuffer() { return this->pOutputBuffer_; } void BaseStream::SetBuffer(const AuSPtr &pBuffer) { if (!pBuffer) { this->pOutputBuffer_ = AuUnsafeRaiiToShared(&this->_outbufferOwned); } else { this->pOutputBuffer_ = pBuffer; } } bool BaseStream::Write(const void *pDest, AuUInt32 dwLength) { return this->pOutputBuffer_->Write(AuReinterpretCast(pDest), dwLength) == dwLength; } AuUInt32 BaseStream::GetInternalBufferSize() { return this->pOutputBuffer_->allocSize; } bool BaseStream::Flush() { return false; } bool BaseStream::Finish() { return false; } }