AuroraRuntime/Source/Compression/AuCompressionInterceptor.cpp

209 lines
5.9 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuCompressionInterceptor.cpp
Date: 2022-9-14
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuCompression.hpp"
#include "AuBaseStream.hpp"
#include "AuCompressionInterceptor.hpp"
namespace Aurora::Compression
{
CompressionInterceptor::CompressionInterceptor()
{
}
void CompressionInterceptor::Init(const AuSPtr<ICompressionStream> &pStream,
const AuSPtr<BaseStream> &pBaseStream)
{
this->pBaseStream_ = pBaseStream;
this->pStream_ = pStream;
}
IO::EStreamError CompressionInterceptor::IsOpen()
{
return IO::EStreamError::eErrorNone;
}
IO::EStreamError CompressionInterceptor::Read(const Memory::MemoryViewStreamWrite &parameters)
{
AuUInt uRemainingBytes = parameters.length;
if (this->uCountMax_)
{
uRemainingBytes = AuMin(this->uCountMax_ - this->uCount_, parameters.length);
}
parameters.outVariable = pLastBuffer_->Read(parameters.ptr, uRemainingBytes);
this->uCount_ += parameters.outVariable;
return parameters.outVariable == 0 ? IO::EStreamError::eErrorEndOfStream : IO::EStreamError::eErrorNone;
}
void CompressionInterceptor::Close()
{
}
bool CompressionInterceptor::HasFailed()
{
return this->bErrorFlag_;
}
bool CompressionInterceptor::OnDataAvailable(const AuSPtr<Memory::ByteBuffer> &pReadInByteBuffer,
const AuSPtr<Memory::ByteBuffer> &pWriteOutByteBuffer,
const AuSPtr<IO::Protocol::IProtocolPiece> &pProtocolPiece)
{
this->pLastBuffer_ = pReadInByteBuffer;
this->pBaseStream_->SetWeakBuffer(pReadInByteBuffer);
this->pBaseStream_->SetBuffer(pWriteOutByteBuffer);
if (this->LimitHasHit())
{
if (!this->bPassthrough_)
{
this->pBaseStream_->SetWeakBuffer({});
return true;
}
auto uCount = pWriteOutByteBuffer->WriteFromEx(*pReadInByteBuffer, AuNumericLimits<AuUInt>::max());
pReadInByteBuffer->readPtr += uCount;
this->pBaseStream_->SetWeakBuffer({});
return bool(uCount);
}
bool bSuccess { true };
do
{
auto uReadable = pReadInByteBuffer->RemainingBytes();
if (!uReadable)
{
break;
}
auto [a, b] = this->pBaseStream_->Ingest(uReadable);
bSuccess = a != 0;
}
while (bSuccess);
this->pBaseStream_->SetWeakBuffer({});
if ((this->bAutoFlush_) ||
(AuExchange(this->bSendFlush_, false)))
{
if (!this->pBaseStream_->Flush())
{
SysPushErrorIO("Couldn't flush stream. I hope this means no data was available/already flushed quirk occurred bc im still not setting bErrorFlag_ yet. This message might explain session instability. ");
}
}
this->pLastBuffer_.reset();
this->pBaseStream_->SetBuffer({});
if (this->LimitHasHit())
{
if (pReadInByteBuffer->RemainingBytes())
{
if (!this->bPassthrough_)
{
return true;
}
pReadInByteBuffer->readPtr += pWriteOutByteBuffer->WriteFromEx(*pReadInByteBuffer, AuNumericLimits<AuUInt>::max());
}
}
else
{
if (!bSuccess)
{
this->bErrorFlag_ = true;
}
}
return !this->bErrorFlag_;
}
void CompressionInterceptor::FlushNextFrame()
{
this->bSendFlush_ = true;
}
bool CompressionInterceptor::ConfigureAutoFlushPerFrame(bool bAutoFlush)
{
return AuExchange(this->bAutoFlush_, bAutoFlush);
}
bool CompressionInterceptor::LimitHasHit()
{
return this->uCount_ >= this->uCountMax_;
}
bool CompressionInterceptor::LimitPassthroughOnOverflow(bool bPassthrough)
{
return AuExchange(this->bPassthrough_, bPassthrough);
}
void CompressionInterceptor::LimitReset()
{
this->uCount_ = 0;
this->uCountMax_ = 0;
}
AuUInt CompressionInterceptor::LimitGetIndex()
{
return this->uCount_;
}
void CompressionInterceptor::LimitSet(AuUInt uLength)
{
this->uCountMax_ = uLength;
}
AUKN_SYM AuSPtr<ICompressionInterceptor> NewDecompressionInterceptor(const DecompressInfo &info)
{
auto pInterceptor = AuMakeShared<CompressionInterceptor>();
if (!pInterceptor)
{
SysPushErrorMemory();
return {};
}
auto pDecompressor = DecompressorShared(AuUnsafeRaiiToShared(pInterceptor.get()),
info);
if (!pDecompressor)
{
SysPushErrorMemory();
return {};
}
pInterceptor->Init(pDecompressor,
AuStaticCast<Compression::BaseStream>(pDecompressor));
return pInterceptor;
}
AUKN_SYM AuSPtr<ICompressionInterceptor> NewCompressionInterceptor(const CompressInfo &info)
{
auto pInterceptor = AuMakeShared<CompressionInterceptor>();
if (!pInterceptor)
{
SysPushErrorMemory();
return {};
}
auto pCompressor = CompressorShared(AuUnsafeRaiiToShared(pInterceptor.get()),
info);
if (!pCompressor)
{
SysPushErrorMemory();
return {};
}
pInterceptor->Init(pCompressor,
AuStaticCast<Compression::BaseStream>(pCompressor));
return pInterceptor;
}
}