600 lines
17 KiB
C++
600 lines
17 KiB
C++
/***
|
|
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
File: AuIOPipeProcessor.cpp
|
|
Date: 2022-6-6
|
|
Author: Reece
|
|
***/
|
|
#include <Source/RuntimeInternal.hpp>
|
|
#include <Aurora/IO/IOExperimental.hpp>
|
|
#include "AuIOProcessor.hpp"
|
|
#include "AuIOPipeProcessor.hpp"
|
|
#include <Aurora/IO/Protocol/Protocol.hpp>
|
|
|
|
namespace Aurora::IO
|
|
{
|
|
IOPipeWork::IOPipeWork(const AuSPtr<IOPipeProcessor> &parent, const IOPipeRequestAIO &request) :
|
|
parent_(parent),
|
|
request(request),
|
|
startCallback(this),
|
|
endCallback(this),
|
|
output(request.output)
|
|
{
|
|
this->uFrameCap_ = request.uPageLengthOrZero ? request.uPageLengthOrZero : request.kFallbackPageSize;
|
|
this->uBufferSize_ = request.uBufferLengthOrZero ? request.uBufferLengthOrZero : request.kFallbackBufferSize;
|
|
this->uBytesWrittenLimit_ = request.uLengthOrZero;
|
|
this->uBytesWrittenTarget_ = request.uMinBytesToRead ? request.uMinBytesToRead : request.uLengthOrZero;
|
|
this->pAsyncTransaction_ = request.pAsyncTransaction;
|
|
this->pAsyncAdapter_ = NewAsyncStreamAdapter(request.pAsyncTransaction, request.bIsStream);
|
|
SysAssert(this->pAsyncAdapter_);
|
|
this->pAsyncAdapter_->SetReadOffset(request.uStartOffset);
|
|
this->pAsyncAdapter_->SetWriteOffset(request.uStartOffset);
|
|
this->pAsyncStreamReader_ = this->pAsyncAdapter_->ToStreamReader();
|
|
}
|
|
|
|
IOPipeWork::IOPipeWork(const AuSPtr<IOPipeProcessor> &parent, const IOPipeRequestBasic &request) :
|
|
parent_(parent),
|
|
request(request),
|
|
startCallback(this),
|
|
endCallback(this),
|
|
output(request.output)
|
|
{
|
|
this->uBufferSize_ = request.uBufferLengthOrZero ? request.uBufferLengthOrZero : request.kFallbackBufferSize;
|
|
this->uFrameCap_ = request.uPageLengthOrZero ? request.uPageLengthOrZero : request.kFallbackPageSize;
|
|
this->uBytesWrittenLimit_ = request.uLengthOrZero;
|
|
this->uBytesWrittenTarget_ = request.uMinBytesToRead ? request.uMinBytesToRead : request.uLengthOrZero;
|
|
}
|
|
|
|
void IOPipeWork::Tick_FrameEpilogue()
|
|
{
|
|
if (AuExchange(this->bShouldReadNext, false))
|
|
{
|
|
this->ReadNext();
|
|
}
|
|
}
|
|
|
|
void IOPipeWork::Tick_Any()
|
|
{
|
|
if (this->pAsyncTransaction_)
|
|
{
|
|
this->AsyncPump();
|
|
}
|
|
else
|
|
{
|
|
this->StreamPump();
|
|
}
|
|
}
|
|
|
|
void IOPipeWork::OnFailureCompletion()
|
|
{
|
|
this->TerminateOnThread(true);
|
|
}
|
|
|
|
void IOPipeWork::OnNominalCompletion()
|
|
{
|
|
this->TerminateOnThread();
|
|
}
|
|
|
|
////////////////////////////////////////////////////////////
|
|
|
|
IOWorkStart::IOWorkStart(IOPipeWork *parent) : parent(parent)
|
|
{
|
|
|
|
}
|
|
|
|
void IOWorkStart::OnRun()
|
|
{
|
|
parent->RunOnThread();
|
|
}
|
|
|
|
void IOWorkStart::OnCanceled()
|
|
{
|
|
parent->TerminateOnThread();
|
|
}
|
|
|
|
////////////////////////////////////////////////////////////
|
|
|
|
IOWorkEnd::IOWorkEnd(IOPipeWork *parent) : parent(parent)
|
|
{
|
|
|
|
}
|
|
|
|
void IOWorkEnd::OnRun()
|
|
{
|
|
parent->TerminateOnThread();
|
|
}
|
|
|
|
void IOWorkEnd::OnCanceled()
|
|
{
|
|
parent->TerminateOnThread();
|
|
}
|
|
|
|
////////////////////////////////////////////////////////////
|
|
|
|
bool IOPipeWork::Start()
|
|
{
|
|
AuSPtr<IIOProcessorItem> ret;
|
|
|
|
if (this->pAsyncTransaction_)
|
|
{
|
|
auto pWaitable = this->pAsyncAdapter_->ToWaitable();
|
|
if (pWaitable)
|
|
{
|
|
ret = this->parent_->parent_->StartIOWatch(pWaitable, AuSharedFromThis());
|
|
if (!ret)
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
ret = this->parent_->parent_->StartIOWatch(this->input_.pWatchItem, AuSharedFromThis());
|
|
|
|
if (!ret)
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
|
|
this->pWatch = ret;
|
|
|
|
if (this->parent_->parent_->CheckThread())
|
|
{
|
|
RunOnThread();
|
|
return true;
|
|
}
|
|
|
|
return this->parent_->
|
|
parent_->SubmitIOWorkItem(AuSPtr<IIOProcessorWorkUnit>(this->SharedFromThis(),
|
|
&this->startCallback));
|
|
}
|
|
|
|
bool IOPipeWork::End()
|
|
{
|
|
if (this->parent_->parent_->CheckThread())
|
|
{
|
|
TerminateOnThread();
|
|
return true;
|
|
}
|
|
|
|
return this->parent_->
|
|
parent_->SubmitIOWorkItem(AuSPtr<IIOProcessorWorkUnit>(this->SharedFromThis(),
|
|
&this->endCallback));
|
|
}
|
|
|
|
AuInt64 IOPipeWork::GetLastTickMS()
|
|
{
|
|
return this->throughput_.GetLastFrameTimeWall();
|
|
}
|
|
|
|
AuInt64 IOPipeWork::GetStartTickMS()
|
|
{
|
|
return this->iStartTickMS_;
|
|
}
|
|
|
|
double IOPipeWork::GetPredictedThroughput()
|
|
{
|
|
return this->throughput_.GetEstimatedHertz();
|
|
}
|
|
|
|
AuUInt64 IOPipeWork::GetBytesProcessed()
|
|
{
|
|
return this->uBytesWritten_;
|
|
}
|
|
|
|
AuUInt64 IOPipeWork::GetBytesProcessedInterframe()
|
|
{
|
|
return this->uBytesWritten_ + this->bytesProcessedInterframe_;
|
|
}
|
|
|
|
void IOPipeWork::PrepareStream()
|
|
{
|
|
if (!this->buffer_.IsEmpty())
|
|
{
|
|
return;
|
|
}
|
|
|
|
if (!this->buffer_.Allocate(this->uBufferSize_, AuHwInfo::GetPageSize(), true))
|
|
{
|
|
SysPushErrorMem();
|
|
TerminateOnThread(true);
|
|
return;
|
|
}
|
|
}
|
|
|
|
void IOPipeWork::PrepareAsync()
|
|
{
|
|
PrepareStream();
|
|
ReadNext();
|
|
}
|
|
|
|
void IOPipeWork::AsyncPump()
|
|
{
|
|
AuMemoryViewWrite internalBuffer;
|
|
|
|
auto err = this->pAsyncStreamReader_->Dequeue(0, internalBuffer);
|
|
if (err != EStreamError::eErrorNone)
|
|
{
|
|
SysPushErrorIO("Async Stream Error: {}", err);
|
|
TerminateOnThread(true);
|
|
return;
|
|
}
|
|
|
|
if (internalBuffer.length == 0)
|
|
{
|
|
// end of stream
|
|
TerminateOnThread(false);
|
|
return;
|
|
}
|
|
|
|
err = this->pAsyncStreamReader_->Dequeue(internalBuffer.length, internalBuffer);
|
|
if (err != EStreamError::eErrorNone)
|
|
{
|
|
SysPushErrorIO("Async Stream Error: {}", err);
|
|
TerminateOnThread(true);
|
|
return;
|
|
}
|
|
|
|
this->buffer_.writePtr += internalBuffer.length;
|
|
|
|
TryPump();
|
|
}
|
|
|
|
void IOPipeWork::StreamPump()
|
|
{
|
|
AuUInt canBuffer = this->buffer_.RemainingWrite();
|
|
canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr));
|
|
|
|
AuUInt read {};
|
|
try
|
|
{
|
|
if (this->input_.pReader->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer_.writePtr, canBuffer), read)) !=
|
|
AuIO::EStreamError::eErrorNone)
|
|
{
|
|
TerminateOnThread();
|
|
return;
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
|
|
this->buffer_.writePtr += read;
|
|
if (this->buffer_.writePtr == this->buffer_.base + this->buffer_.length)
|
|
{
|
|
this->buffer_.writePtr = this->buffer_.base;
|
|
}
|
|
|
|
TryPump();
|
|
}
|
|
|
|
void IOPipeWork::ReadNextAsync()
|
|
{
|
|
try
|
|
{
|
|
AuUInt canBuffer = this->buffer_.RemainingWrite();
|
|
canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr));
|
|
canBuffer = AuMin(AuUInt(this->uFrameCap_), canBuffer);
|
|
|
|
this->nextWriteAsync_ = AuMemoryViewWrite(this->buffer_.writePtr, canBuffer);
|
|
|
|
if (this->pAsyncStreamReader_->BeginRead(AuSPtr<AuMemoryViewWrite>(this->SharedFromThis(), &this->nextWriteAsync_)) !=
|
|
AuIO::EStreamError::eErrorNone)
|
|
{
|
|
TerminateOnThread(true);
|
|
return;
|
|
}
|
|
|
|
if (!this->nextWriteAsync_)
|
|
{
|
|
TerminateOnThread();
|
|
return;
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
}
|
|
|
|
void IOPipeWork::ReadNext()
|
|
{
|
|
if (!this->bActive)
|
|
{
|
|
return;
|
|
}
|
|
|
|
if (IsAtRequestedEnd())
|
|
{
|
|
TerminateOnThread(false);
|
|
return;
|
|
}
|
|
|
|
if (this->pAsyncTransaction_)
|
|
{
|
|
ReadNextAsync();
|
|
}
|
|
else
|
|
{
|
|
if (this->input_.pBackend)
|
|
{
|
|
this->input_.pBackend->OnEndPump();
|
|
}
|
|
}
|
|
}
|
|
|
|
AuUInt32 IOPipeWork::TryPump()
|
|
{
|
|
AuUInt &bytesProcessedTotal = this->bytesProcessedInterframe_;
|
|
AuUInt bytesProcessed {};
|
|
bool bIsCullingLastFrame {};
|
|
|
|
bytesProcessedTotal = 0;
|
|
|
|
do
|
|
{
|
|
AuUInt canRead2 = this->buffer_.RemainingBytes();
|
|
AuUInt canRead = canRead2;
|
|
if (!canRead)
|
|
{
|
|
break;
|
|
}
|
|
|
|
canRead = AuMin<AuUInt>(canRead, (this->buffer_.length + this->buffer_.base) - this->buffer_.readPtr);
|
|
if (!canRead)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
auto oldReadHeadPtr = this->buffer_.readPtr;
|
|
auto readHead = oldReadHeadPtr - this->buffer_.base;
|
|
|
|
auto oldWriteHeadPtr = this->buffer_.writePtr;
|
|
auto writeHead = oldWriteHeadPtr - this->buffer_.base;
|
|
|
|
auto uInterframeProgress = this->GetBytesProcessedInterframe();
|
|
|
|
if (bIsCullingLastFrame = (this->uBytesWrittenLimit_ && canRead2 + uInterframeProgress > this->uBytesWrittenLimit_))
|
|
{
|
|
auto uLastFrameBytes = this->uBytesWrittenLimit_ - uInterframeProgress;
|
|
auto uAbsDataToRead = AuMin<AuUInt>(canRead, uLastFrameBytes);
|
|
this->buffer_.writePtr = this->buffer_.readPtr + uAbsDataToRead;
|
|
}
|
|
|
|
if (pProtocolStack)
|
|
{
|
|
pProtocolStack->DoTick();
|
|
}
|
|
|
|
try
|
|
{
|
|
if (this->output.type == EPipeCallbackType::eTryHandleBufferedPart)
|
|
{
|
|
if (this->output.handleBufferedStream.pOnData)
|
|
{
|
|
if (!this->output.handleBufferedStream.pOnData->OnDataAvailable(this->buffer_))
|
|
{
|
|
bytesProcessed = 0;
|
|
this->buffer_.readPtr = this->buffer_.base + readHead;
|
|
}
|
|
else
|
|
{
|
|
bytesProcessed = this->buffer_.readPtr - oldReadHeadPtr;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
bytesProcessed = this->buffer_.readPtr - oldReadHeadPtr;
|
|
}
|
|
}
|
|
else if (this->output.type == EPipeCallbackType::eWriteToWriter)
|
|
{
|
|
if (this->output.forwardStream.pIntercepter)
|
|
{
|
|
if (!this->output.forwardStream.pIntercepter->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer_.readPtr, canRead), bytesProcessed), this->output.forwardStream.pWriter))
|
|
{
|
|
bytesProcessed = 0;
|
|
}
|
|
else
|
|
{
|
|
this->buffer_.readPtr += bytesProcessed;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
bytesProcessed = this->buffer_.readPtr - oldReadHeadPtr;
|
|
}
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
|
|
bytesProcessedTotal += bytesProcessed;
|
|
|
|
if (oldWriteHeadPtr != this->buffer_.writePtr)
|
|
{
|
|
this->buffer_.writePtr = this->buffer_.base + writeHead;
|
|
}
|
|
|
|
if (bIsCullingLastFrame)
|
|
{
|
|
this->bShouldReadNext = false;
|
|
break;
|
|
}
|
|
|
|
if (this->buffer_.readPtr == this->buffer_.writePtr)
|
|
{
|
|
this->buffer_.readPtr = this->buffer_.base;
|
|
this->buffer_.writePtr = this->buffer_.base;
|
|
|
|
this->bShouldReadNext = true;
|
|
}
|
|
else if (!bytesProcessed)
|
|
{
|
|
this->bShouldReadNext = true;
|
|
}
|
|
|
|
// Prevent fucky end of allocation issues by moving the tail end of a partially buffered stream back to the start
|
|
|
|
// Should help with packing massive files, where faster disks can spin through smaller frames, leaving
|
|
// the CPU to catch up towards the end of the buffer, at which point the linearity breaks.
|
|
// We must instead force linearity, and with the assumption we can move peekable memory around, we must eventually
|
|
// move the tail end of the buffer back to the start, just so we can continue that stream view linearity.
|
|
|
|
// I really don't know how ReadNextAsync can be expected to wrap around a ring buffer.
|
|
// We'd need to know if this pass failed, and if the read head is near the end, it'd know to wrap back around to zero.
|
|
// An overengineered pain and liability.
|
|
|
|
// This should work
|
|
|
|
if (readHead > (this->buffer_.length / 4) * 3)
|
|
{
|
|
auto readPtr = this->buffer_.base + readHead;
|
|
auto len = this->buffer_.writePtr - readPtr;
|
|
AuMemmove(this->buffer_.base, readPtr, len);
|
|
this->buffer_.writePtr = this->buffer_.base + len;
|
|
this->buffer_.readPtr = this->buffer_.base;
|
|
}
|
|
|
|
if (this->output.type == EPipeCallbackType::eWriteToWriter)
|
|
{
|
|
if (this->output.forwardStream.bFlushWriter)
|
|
{
|
|
if (this->output.forwardStream.pWriter)
|
|
{
|
|
this->output.forwardStream.pWriter->Flush();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
while (AuExchange(bytesProcessed, 0));
|
|
|
|
if (!bIsCullingLastFrame)
|
|
{
|
|
if (this->buffer_.readPtr == this->buffer_.base)
|
|
{
|
|
this->bShouldReadNext = true;
|
|
}
|
|
}
|
|
|
|
this->uBytesWritten_ += bytesProcessedTotal;
|
|
|
|
this->throughput_.OnUpdate(bytesProcessedTotal);
|
|
|
|
if (this->request.pListener)
|
|
{
|
|
this->request.pListener->OnPipePartialEvent(bytesProcessedTotal);
|
|
}
|
|
|
|
if (bIsCullingLastFrame)
|
|
{
|
|
TerminateOnThread(false);
|
|
}
|
|
|
|
return bytesProcessedTotal;
|
|
}
|
|
|
|
bool IOPipeWork::IsAtRequestedEnd()
|
|
{
|
|
return this->uBytesWrittenLimit_ && (this->uBytesWrittenLimit_ <= this->uBytesWritten_);
|
|
}
|
|
|
|
AuByteBuffer *IOPipeWork::GetBuffer()
|
|
{
|
|
return &this->buffer_;
|
|
}
|
|
|
|
void IOPipeWork::RunOnThread()
|
|
{
|
|
if (this->input_.pBackend)
|
|
{
|
|
this->input_.pBackend->OnStart();
|
|
}
|
|
|
|
if (this->pAsyncTransaction_)
|
|
{
|
|
PrepareAsync();
|
|
}
|
|
else
|
|
{
|
|
PrepareStream();
|
|
}
|
|
}
|
|
|
|
void IOPipeWork::TerminateOnThread(bool error)
|
|
{
|
|
if (!this->bActive)
|
|
{
|
|
return;
|
|
}
|
|
|
|
this->bActive = false;
|
|
|
|
if (this->pWatch)
|
|
{
|
|
this->pWatch->StopWatch();
|
|
}
|
|
|
|
if (this->request.pListener)
|
|
{
|
|
if (error)
|
|
{
|
|
// We explicitly failed...
|
|
this->request.pListener->OnPipeFailureEvent();
|
|
}
|
|
else if (this->uBytesWrittenTarget_ && (this->uBytesWrittenTarget_ > this->uBytesWritten_))
|
|
{
|
|
// Finished without error early
|
|
this->request.pListener->OnPipeFailureEvent();
|
|
}
|
|
else
|
|
{
|
|
// We finished...
|
|
this->request.pListener->OnPipeSuccessEvent();
|
|
}
|
|
|
|
this->request.pListener.reset();
|
|
}
|
|
|
|
this->output.handleBufferedStream.pOnData.reset();
|
|
this->output.forwardStream.pIntercepter.reset();
|
|
this->output.forwardStream.pWriter.reset();
|
|
|
|
if (this->input_.pBackend)
|
|
{
|
|
this->input_.pBackend->OnEnd();
|
|
this->input_.pBackend.reset();
|
|
}
|
|
|
|
if (auto transaction = this->pAsyncTransaction_)
|
|
{
|
|
transaction->Reset();
|
|
this->pAsyncTransaction_.reset();
|
|
}
|
|
|
|
this->pAsyncAdapter_.reset();
|
|
this->pAsyncStreamReader_.reset();
|
|
}
|
|
|
|
IOPipeProcessor::IOPipeProcessor(IOProcessor *parent) :
|
|
parent_(parent)
|
|
{
|
|
|
|
}
|
|
|
|
AuSPtr<IIOPipeWork> IOPipeProcessor::NewBasicPipe(const IOPipeRequestBasic &request)
|
|
{
|
|
// TODO: Incomplete
|
|
return AuMakeShared<IOPipeWork>(AuStaticCast<IOPipeProcessor>(this->parent_->ToPipeProcessor()), request);
|
|
}
|
|
|
|
AuSPtr<IIOPipeWork> IOPipeProcessor::NewAIOPipe(const IOPipeRequestAIO &request)
|
|
{
|
|
// TODO: Incomplete
|
|
return AuMakeShared<IOPipeWork>(AuStaticCast<IOPipeProcessor>(this->parent_->ToPipeProcessor()), request);
|
|
}
|
|
} |