[+] Optimize half of the compression cpy operation

This commit is contained in:
Reece Wilson 2023-06-05 14:55:40 +01:00
parent 07ce6d8974
commit aa7f783fe9
13 changed files with 161 additions and 71 deletions

View File

@ -67,9 +67,8 @@ namespace Aurora::Compression
/**
* @brief Internal output buffer size.
* Internal swap page is undefined.
*/
AuUInt32 uInternalStreamSize { 10 * 4096 };
AuUInt32 uInternalStreamSize { 512 * 1024 };
AuUInt8 uThreads {1};
@ -96,9 +95,9 @@ namespace Aurora::Compression
ECompressionType alg { ECompressionType::eDeflate };
/**
* @brief Internal output buffer size. Internal swap page is undefined.
* @brief Internal output buffer size.
*/
AuUInt32 uInternalStreamSize { 10 * 4096 };
AuUInt32 uInternalStreamSize { 512 * 1024 };
/**
* @brief Flag for headerless decompression streams

View File

@ -122,11 +122,56 @@ namespace Aurora::Compression
}
}
void BaseStream::InitByDesc(CompressInfo &info)
{
}
void BaseStream::InitByDesc(DecompressInfo &info)
{
}
bool BaseStream::Write(const void *pDest, AuUInt32 dwLength)
{
if (this->pOutputBufferInterface_)
{
AuUInt uWritten {};
AuIO::WriteAll(this->pOutputBufferInterface_.get(), AuMemoryViewStreamRead(AuMemoryViewRead((char *)pDest, dwLength), uWritten));
return uWritten == dwLength;
}
return this->pOutputBuffer_->Write(AuReinterpretCast<const AuUInt8 *>(pDest), dwLength) == dwLength;
}
bool BaseStream::Write2(const void *pDest, AuUInt32 dwLength)
{
if (this->pOutputBuffer_)
{
if (this->pOutputBuffer_->CanWrite(dwLength))
{
this->pOutputBuffer_->writePtr += dwLength;
return true;
}
SysUnreachable();
}
else
{
return this->Write(pDest, dwLength) == dwLength;
}
}
AuPair<char *, AuUInt32> BaseStream::GetDOutPair()
{
if (this->pOutputBuffer_)
{
auto view = this->pOutputBuffer_->GetNextLinearWrite();
return AuMakePair((char *)view.ptr, view.length);
}
else
{
return AuMakePair((char *)this->internalOutBuffer_, this->internalOutLength_);
}
}
AuUInt32 BaseStream::GetInternalBufferSize()
{
return this->pOutputBuffer_->allocSize;

View File

@ -33,6 +33,9 @@ namespace Aurora::Compression
virtual AuUInt32 GetInternalBufferSize() override;
bool Write(const void *pDest, AuUInt32 dwLength);
bool Write2(const void *pDest, AuUInt32 dwLength);
AuPair<char *, AuUInt32> GetDOutPair();
virtual AuStreamReadWrittenPair_t Ingest(AuUInt32 bytesFromUnprocessedInputSource) override;
@ -44,10 +47,15 @@ namespace Aurora::Compression
AuSPtr<Memory::ByteBuffer> GetBuffer();
void SetBuffer(const AuSPtr<Memory::ByteBuffer> &pBuffer);
void InitByDesc(CompressInfo &info);
void InitByDesc(DecompressInfo &info);
protected:
virtual AuStreamReadWrittenPair_t Ingest_s(AuUInt32 dwBytesFromUnprocessedInputSource) = 0;
AuSPtr<Memory::ByteBuffer> pOutputBuffer_;
AuSPtr<AuIO::IStreamWriter> pOutputBufferInterface_;
AuSPtr<Memory::ByteBuffer> pFallbackTempBuffer_;
Memory::ByteBuffer _outbufferOwned;
AuThreadPrimitives::SpinLock _spinlock;
AuUInt32 uBufferSize_;

View File

@ -15,14 +15,19 @@ namespace Aurora::Compression
template<typename T, AuUInt Z>
inline void SetArray(T(&array)[Z]);
template<typename T, AuUInt Z>
inline void SetOutArray(T(&array)[Z]);
inline void SetPointer(void *pointer, AuUInt32 dwLength);
template<typename T, typename Z>
inline AuUInt32 IngestForInPointer(const AuSPtr<IO::IStreamReader> &reader, T *&in, Z &inAlreadyAvailable, AuUInt32 dwAmount);
private:
protected:
AuUInt8 *internalInBuffer_ {};
AuUInt32 internalInLength_ {};
AuUInt8 *internalOutBuffer_ {};
AuUInt32 internalOutLength_ {};
};
}

View File

@ -16,6 +16,13 @@ namespace Aurora::Compression
this->internalInLength_ = Z;
}
template<typename T, AuUInt Z>
void IngestableReadBase::SetOutArray(T(&array)[Z])
{
this->internalOutBuffer_ = AuReinterpretCast<AuUInt8 *>(array);
this->internalOutLength_ = Z;
}
void IngestableReadBase::SetPointer(void *pHead, AuUInt32 dwLength)
{
this->internalInBuffer_ = AuReinterpretCast<AuUInt8 *>(pHead);

View File

@ -64,6 +64,7 @@ namespace Aurora::Compression
this->bInit_ = true;
this->SetArray(this->din_);
this->SetOutArray(this->dout_);
return true;
}
@ -87,8 +88,10 @@ namespace Aurora::Compression
do
{
this->ctx_.avail_out = AuArraySize(this->dout_);
this->ctx_.next_out = this->dout_;
auto [pMainDOut, uMainDOutLength] = this->GetDOutPair();
this->ctx_.avail_out = uMainDOutLength;
this->ctx_.next_out = pMainDOut;
auto ret = BZ2_bzCompress(&this->ctx_, BZ_RUN);
if (ret < Z_OK)
@ -98,17 +101,16 @@ namespace Aurora::Compression
return AuMakePair(read, 0);
}
auto have = AuArraySize(this->dout_) - this->ctx_.avail_out;
auto have = uMainDOutLength - this->ctx_.avail_out;
done += have;
if (!Write(reinterpret_cast<const AuUInt8 *>(this->dout_),
if (!Write2(reinterpret_cast<const AuUInt8 *>(pMainDOut),
have))
{
this->pReader_.reset();
SysPushErrorIO("Compression Out of Overhead");
return AuMakePair(read, 0);
}
}
while (this->ctx_.avail_out == 0);
}
@ -137,8 +139,10 @@ namespace Aurora::Compression
{
do
{
this->ctx_.avail_out = AuArraySize(this->dout_);
this->ctx_.next_out = this->dout_;
auto [pMainDOut, uMainDOutLength] = this->GetDOutPair();
this->ctx_.avail_out = uMainDOutLength;
this->ctx_.next_out = pMainDOut;
auto ret = BZ2_bzCompress(&this->ctx_, type);
if (ret < Z_OK)
@ -148,9 +152,9 @@ namespace Aurora::Compression
return false;
}
auto have = AuArraySize(this->dout_) - this->ctx_.avail_out;
auto have = uMainDOutLength - this->ctx_.avail_out;
if (!Write(reinterpret_cast<const AuUInt8*>(this->dout_),
if (!Write2(pMainDOut,
have))
{
this->pReader_.reset();
@ -166,8 +170,9 @@ namespace Aurora::Compression
{
do
{
this->ctx_.avail_out = AuArraySize(this->dout_);
this->ctx_.next_out = this->dout_;
auto [pMainDOut, uMainDOutLength] = this->GetDOutPair();
this->ctx_.avail_out = uMainDOutLength;
this->ctx_.next_out = pMainDOut;
auto ret = BZ2_bzCompress(&this->ctx_, type);
if (ret < Z_OK)
@ -177,9 +182,9 @@ namespace Aurora::Compression
return false;
}
auto have = AuArraySize(this->dout_) - this->ctx_.avail_out;
auto have = uMainDOutLength - this->ctx_.avail_out;
if (!Write(reinterpret_cast<const AuUInt8 *>(this->dout_),
if (!Write2(reinterpret_cast<const AuUInt8 *>(pMainDOut),
have))
{
this->pReader_.reset();

View File

@ -65,6 +65,7 @@ namespace Aurora::Compression
this->bInit_ = true;
this->SetArray(this->din_);
this->SetOutArray(this->dout_);
return true;
}
@ -79,6 +80,8 @@ namespace Aurora::Compression
while (read < input)
{
auto [pMainDOut, uMainDOutLength] = this->GetDOutPair();
read += IngestForInPointer<char, uInt>(this->pReader_, this->ctx_.next_in, this->ctx_.avail_in, input - read);
if (!this->ctx_.avail_in)
@ -88,8 +91,8 @@ namespace Aurora::Compression
do
{
this->ctx_.avail_out = AuArraySize(this->dout_);
this->ctx_.next_out = this->dout_;
this->ctx_.avail_out = uMainDOutLength;
this->ctx_.next_out = pMainDOut;
auto ret = BZ2_bzDecompress(&this->ctx_);
if (ret < Z_OK)
@ -99,10 +102,10 @@ namespace Aurora::Compression
return AuMakePair(read, 0);
}
auto have = AuArraySize(this->dout_) - this->ctx_.avail_out;
auto have = uMainDOutLength - this->ctx_.avail_out;
done += have;
if (!Write(reinterpret_cast<const AuUInt8 *>(this->dout_),
if (!Write2(pMainDOut,
have))
{
SysPushErrorIO("Compression Out of Overhead");

View File

@ -43,7 +43,9 @@ namespace Aurora::Compression
return false;
}
this->InitByDesc(this->meta);
this->SetArray(this->din_);
this->SetOutArray(this->dout_);
return true;
}
@ -70,12 +72,13 @@ namespace Aurora::Compression
return { read, done };
}
auto [pMainDOut, uMainDOutLength] = this->GetDOutPair();
size_t outNext {};
uint8_t *pOut {};
do
{
outNext = AuArraySize(this->dout_);
pOut = this->dout_;
outNext = uMainDOutLength;
pOut = (uint8_t *)pMainDOut;
auto ret = BrotliEncoderCompressStream(this->pState, BrotliEncoderOperation::BROTLI_OPERATION_PROCESS, &this->uAvailIn, (const uint8_t **) & this->pInBuffer, &outNext, &pOut, nullptr);
if (ret == BROTLI_FALSE)
@ -85,10 +88,10 @@ namespace Aurora::Compression
return AuMakePair(read, 0);
}
auto have = AuArraySize(this->dout_) - outNext;
auto have = uMainDOutLength - outNext;
done += have;
if (!Write(reinterpret_cast<const AuUInt8 *>(this->dout_), have))
if (!Write2(reinterpret_cast<const AuUInt8 *>(pMainDOut), have))
{
this->pReader_.reset();
SysPushErrorIO("Compression Out of Overhead");
@ -128,8 +131,10 @@ namespace Aurora::Compression
uint8_t *pOut {};
while (outNext == 0 || BrotliEncoderHasMoreOutput(this->pState));
{
outNext = AuArraySize(this->dout_);
pOut = this->dout_;
auto [pMainDOut, uMainDOutLength] = this->GetDOutPair();
outNext = uMainDOutLength;
pOut = (uint8_t *)pMainDOut;
auto ret = BrotliEncoderCompressStream(this->pState, type, &this->uAvailIn, (const uint8_t **)&this->pInBuffer, &outNext, &pOut, nullptr);
if (ret == BROTLI_FALSE)
@ -139,8 +144,8 @@ namespace Aurora::Compression
return false;
}
auto have = AuArraySize(this->dout_) - outNext;
if (!Write(reinterpret_cast<const AuUInt8 *>(this->dout_), have))
auto have = uMainDOutLength - outNext;
if (!Write2(reinterpret_cast<const AuUInt8 *>(pMainDOut), have))
{
this->pReader_.reset();
SysPushErrorIO("Compression Out of Overhead");

View File

@ -45,6 +45,7 @@ namespace Aurora::Compression
}
this->SetArray(this->din_);
this->SetOutArray(this->dout_);
return true;
}
@ -70,8 +71,10 @@ namespace Aurora::Compression
uint8_t *pOut {};
do
{
outNext = AuArraySize(this->dout_);
pOut = this->dout_;
auto [pMainDOut, uMainDOutLength] = this->GetDOutPair();
outNext = uMainDOutLength;
pOut = (uint8_t *)pMainDOut;
auto ret = BrotliDecoderDecompressStream(this->pState, &this->uAvailIn, (const uint8_t **)&this->pInBuffer, &outNext, &pOut, nullptr);
if (ret == BROTLI_DECODER_RESULT_ERROR)
@ -81,10 +84,10 @@ namespace Aurora::Compression
return AuMakePair(read, 0);
}
auto have = AuArraySize(this->dout_) - outNext;
auto have = uMainDOutLength - outNext;
done += have;
if (!Write(reinterpret_cast<const AuUInt8 *>(this->dout_), have))
if (!Write2(pMainDOut, have))
{
this->pReader_.reset();
SysPushErrorIO("Compression Out of Overhead");

View File

@ -48,6 +48,7 @@ namespace Aurora::Compression
this->init_ = true;
this->SetArray(this->din_);
this->SetOutArray(this->dout_);
return true;
}
@ -71,8 +72,9 @@ namespace Aurora::Compression
do
{
this->ctx_.avail_out = AuArraySize(this->dout_);
this->ctx_.next_out = this->dout_;
auto [pMainDOut, uMainDOutLength] = this->GetDOutPair();
this->ctx_.avail_out = uMainDOutLength;
this->ctx_.next_out = (Bytef *)pMainDOut;
if (!this->ctx_.avail_out)
{
@ -87,10 +89,10 @@ namespace Aurora::Compression
return AuMakePair(read, 0);
}
auto have = AuArraySize(this->dout_) - this->ctx_.avail_out;
auto have = uMainDOutLength - this->ctx_.avail_out;
done += have;
if (!Write(reinterpret_cast<const AuUInt8 *>(this->dout_),
if (!Write2(reinterpret_cast<const AuUInt8 *>(pMainDOut),
have))
{
this->pReader_.reset();
@ -126,8 +128,9 @@ namespace Aurora::Compression
{
do
{
this->ctx_.avail_out = AuArraySize(this->dout_);
this->ctx_.next_out = this->dout_;
auto [pMainDOut, uMainDOutLength] = this->GetDOutPair();
this->ctx_.avail_out = uMainDOutLength;
this->ctx_.next_out = (Bytef *)pMainDOut;
auto ret = deflate(&this->ctx_, Z_FULL_FLUSH);
if (ret < Z_OK)
@ -137,10 +140,9 @@ namespace Aurora::Compression
return false;
}
auto have = AuArraySize(this->dout_) - this->ctx_.avail_out;
auto have = uMainDOutLength - this->ctx_.avail_out;
if (!Write(reinterpret_cast<const AuUInt8 *>(this->dout_),
have))
if (!Write2(reinterpret_cast<const AuUInt8 *>(this->dout_), have))
{
this->pReader_.reset();
SysPushErrorIO("Compression Out of Overhead");
@ -154,8 +156,9 @@ namespace Aurora::Compression
{
do
{
this->ctx_.avail_out = AuArraySize(this->dout_);
this->ctx_.next_out = this->dout_;
auto [pMainDOut, uMainDOutLength] = this->GetDOutPair();
this->ctx_.avail_out = uMainDOutLength;
this->ctx_.next_out = (Bytef *)pMainDOut;
auto ret = deflate(&this->ctx_, type);
if (ret < Z_OK)
@ -165,10 +168,9 @@ namespace Aurora::Compression
return false;
}
auto have = AuArraySize(this->dout_) - this->ctx_.avail_out;
auto have = uMainDOutLength - this->ctx_.avail_out;
if (!Write(reinterpret_cast<const AuUInt8 *>(this->dout_),
have))
if (!Write2(pMainDOut, have))
{
this->pReader_.reset();
SysPushErrorIO("Compression Out of Overhead");

View File

@ -54,6 +54,7 @@ namespace Aurora::Compression
this->init_ = true;
this->SetArray(this->din_);
this->SetOutArray(this->dout_);
return true;
}
@ -77,8 +78,9 @@ namespace Aurora::Compression
do
{
this->ctx_.avail_out = AuArraySize(this->dout_);
this->ctx_.next_out = this->dout_;
auto [pMainDOut, uMainDOutLength] = this->GetDOutPair();
this->ctx_.avail_out = uMainDOutLength;
this->ctx_.next_out = (Bytef *)pMainDOut;
if (!this->ctx_.avail_out)
{
@ -93,10 +95,10 @@ namespace Aurora::Compression
return AuMakePair(read, 0);
}
auto have = AuArraySize(this->dout_) - this->ctx_.avail_out;
auto have = uMainDOutLength - this->ctx_.avail_out;
done += have;
if (!Write(reinterpret_cast<const AuUInt8 *>(this->dout_),
if (!Write2(pMainDOut,
have))
{
this->pReader_.reset();

View File

@ -67,7 +67,8 @@ namespace Aurora::Compression
this->pIterator_ = this->din_;
this->uAvailableIn_ = 0;
SetArray(this->din_);
this->SetArray(this->din_);
this->SetOutArray(this->dout_);
return true;
}
@ -101,7 +102,9 @@ namespace Aurora::Compression
while ((this->input_.pos != this->input_.size) ||
(/*bLastFrame && uRet*/ false))
{
ZSTD_outBuffer output = { this->dout_, uOutFrameLength, 0 };
auto [pMainDOut, uMainDOutLength] = this->GetDOutPair();
ZSTD_outBuffer output = { pMainDOut, uMainDOutLength, 0 };
ZSTD_EndDirective mode = ZSTD_e_continue;// bLastFrame ? ZSTD_e_flush : ZSTD_e_continue;
uRet = ZSTD_compressStream2(this->cctx_, &output, &this->input_, mode);
@ -122,7 +125,7 @@ namespace Aurora::Compression
done += AuUInt32(output.pos);
if (!Write(reinterpret_cast<const AuUInt8 *>(this->dout_),
if (!Write2(reinterpret_cast<const AuUInt8 *>(pMainDOut),
AuUInt32(output.pos)))
{
SysPushErrorIO("Compression Out of Overhead");
@ -150,7 +153,8 @@ namespace Aurora::Compression
(this->bLastFrameHasNotFinished_) ||
(uRet))
{
ZSTD_outBuffer output = { this->dout_, uOutFrameLength, 0 };
auto [pMainDOut, uMainDOutLength] = this->GetDOutPair();
ZSTD_outBuffer output = { pMainDOut, uMainDOutLength, 0 };
this->bLastFrameHasNotFinished_ = false;
@ -167,7 +171,7 @@ namespace Aurora::Compression
continue;
}
if (!Write(reinterpret_cast<const AuUInt8 *>(this->dout_),
if (!Write2(reinterpret_cast<const AuUInt8 *>(pMainDOut),
AuUInt32(output.pos)))
{
SysPushErrorIO("Compression Out of Overhead");

View File

@ -46,7 +46,8 @@ namespace Aurora::Compression
this->pIterator_ = this->din_;
this->uAvailableIn_ = 0;
SetArray(this->din_);
this->SetArray(this->din_);
this->SetOutArray(this->dout_);
return true;
}
@ -74,7 +75,8 @@ namespace Aurora::Compression
while (this->input_.pos < this->input_.size)
{
ZSTD_outBuffer output = {this->dout_, uOutFrameLength, 0};
auto [pMainDOut, uMainDOutLength] = this->GetDOutPair();
ZSTD_outBuffer output = { pMainDOut, uMainDOutLength, 0 };
auto ret = ZSTD_decompressStream(this->dctx_, &output, &this->input_);
if (ZSTD_isError(ret))
@ -86,7 +88,7 @@ namespace Aurora::Compression
done += AuUInt32(output.pos);
if (!Write(reinterpret_cast<const AuUInt8 *>(output.dst),
if (!Write2(reinterpret_cast<const AuUInt8 *>(pMainDOut),
AuUInt32(output.pos)))
{
SysPushErrorIO("Compression Out of Overhead");