AuroraRuntime/Source/IO/IOPipeProcessor.cpp

565 lines
15 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOPipeProcessor.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "IOPipeProcessor.hpp"
#include "IOProcessor.hpp"
namespace Aurora::IO
{
struct IOPipeWork;
struct IOWorkStart : IIOProcessorWorkUnit
{
IOWorkStart(IOPipeWork *parent);
void OnRun() override;
void OnCanceled() override;
IOPipeWork *parent {};
};
struct IOWorkEnd : IIOProcessorWorkUnit
{
IOWorkEnd(IOPipeWork *parent);
void OnRun() override;
void OnCanceled() override;
IOPipeWork *parent {};
};
struct IOPipeWork : IIOPipeWork, IIOEventListenerFunctional, AuEnableSharedFromThis<IIOPipeWork>
{
IOPipeWork(const AuSPtr<IOPipeProcessor> &parent, const IOPipeRequestAIO &request);
IOPipeWork(const AuSPtr<IOPipeProcessor> &parent, const IOPipeRequestBasic &request);
AuSPtr<IIOProcessorItem> watch;
void Tick_FrameEpilogue() override;
void Tick_Any() override;
void OnFailureCompletion() override;
void OnNominalCompletion() override;
virtual bool Start() override;
virtual bool End() override;
void RunOnThread();
void TerminateOnThread(bool error = false);
// INIT
void PrepareStream();
void PrepareAsync();
// PUMP
void AsyncPump();
void StreamPump();
// END/INIT
void ReadNext();
void ReadNextAsync();
AuUInt32 TryPump();
AuMemoryViewWrite nextWriteAsync_;
IOPipeRequest request {};
bool bShouldReadNext {false};
bool IsAtRequestedEnd()
{
return this->bytesWrittenLimit_ && (this->bytesWrittenLimit_ <= this->bytesWritten_);
}
private:
AuSPtr<IOPipeProcessor> parent_;
struct /*not a union. the following members are mutex*/
{
IOPipeInputData input_;
AuSPtr<IAsyncTransaction> asyncTransaction_;
AuSPtr<IAsyncStreamAdapter> asyncAdapter_;
AuSPtr<IAsyncStreamReader> asyncStreamReader_;
};
IOPipeCallback output;
IOWorkStart startCallback;
IOWorkEnd endCallback;
bool bActive {true};
AuUInt32 bufferSize_ {};
AuUInt32 frameCap_ {};
AuUInt bytesWritten_ {};
AuUInt bytesWrittenLimit_ {};
AuByteBuffer buffer_;
};
IOPipeWork::IOPipeWork(const AuSPtr<IOPipeProcessor> &parent, const IOPipeRequestAIO &request) :
parent_(parent),
request(request),
startCallback(this),
endCallback(this),
output(request.output)
{
this->frameCap_ = request.pageLengthOrZero ? request.pageLengthOrZero : request.kFallbackPageSize;
this->bufferSize_ = request.bufferLengthOrZero ? request.bufferLengthOrZero : request.kFallbackBufferSize;
this->bytesWrittenLimit_ = request.lengthOrZero;
this->asyncTransaction_ = request.asyncTransaction;
this->asyncAdapter_ = NewAsyncStreamAdapter(request.asyncTransaction, request.isStream);
SysAssert(this->asyncAdapter_);
this->asyncStreamReader_ = this->asyncAdapter_->ToStreamReader();
}
IOPipeWork::IOPipeWork(const AuSPtr<IOPipeProcessor> &parent, const IOPipeRequestBasic &request) :
parent_(parent),
request(request),
startCallback(this),
endCallback(this),
output(request.output)
{
this->bufferSize_ = request.bufferLengthOrZero ? request.bufferLengthOrZero : request.kFallbackBufferSize;
this->frameCap_ = request.pageLengthOrZero ? request.pageLengthOrZero : request.kFallbackPageSize;
this->bytesWrittenLimit_ = request.lengthOrZero;
}
void IOPipeWork::Tick_FrameEpilogue()
{
if (AuExchange(this->bShouldReadNext, false))
{
this->ReadNext();
}
}
void IOPipeWork::Tick_Any()
{
if (this->asyncTransaction_)
{
this->AsyncPump();
}
else
{
this->StreamPump();
}
}
void IOPipeWork::OnFailureCompletion()
{
this->TerminateOnThread(true);
}
void IOPipeWork::OnNominalCompletion()
{
this->TerminateOnThread();
}
////////////////////////////////////////////////////////////
IOWorkStart::IOWorkStart(IOPipeWork *parent) : parent(parent)
{
}
void IOWorkStart::OnRun()
{
parent->RunOnThread();
}
void IOWorkStart::OnCanceled()
{
parent->TerminateOnThread();
}
////////////////////////////////////////////////////////////
IOWorkEnd::IOWorkEnd(IOPipeWork *parent) : parent(parent)
{
}
void IOWorkEnd::OnRun()
{
parent->TerminateOnThread();
}
void IOWorkEnd::OnCanceled()
{
parent->TerminateOnThread();
}
////////////////////////////////////////////////////////////
bool IOPipeWork::Start()
{
AuSPtr<IIOProcessorItem> ret;
if (this->asyncTransaction_)
{
ret = this->parent_->parent_->StartIOWatch(this->asyncAdapter_->ToWaitable(), AuSharedFromThis());
if (!ret)
{
return false;
}
}
else
{
ret = this->parent_->parent_->StartIOWatch(this->input_.watchItem, AuSharedFromThis());
if (!ret)
{
return false;
}
}
this->watch = ret;
if (this->parent_->parent_->CheckThread())
{
RunOnThread();
return true;
}
return this->parent_->
parent_->SubmitIOWorkItem(AuSPtr<IIOProcessorWorkUnit>(this->SharedFromThis(),
&this->startCallback));
}
bool IOPipeWork::End()
{
if (this->parent_->parent_->CheckThread())
{
TerminateOnThread();
return true;
}
return this->parent_->
parent_->SubmitIOWorkItem(AuSPtr<IIOProcessorWorkUnit>(this->SharedFromThis(),
&this->endCallback));
}
void IOPipeWork::PrepareStream()
{
if (this->buffer_.IsEmpty())
{
this->buffer_.Allocate(this->bufferSize_);
this->buffer_.flagCircular = true; // !!!
}
if (this->buffer_.IsEmpty())
{
SysPushErrorMem();
TerminateOnThread(true);
return;
}
}
void IOPipeWork::PrepareAsync()
{
PrepareStream();
ReadNext();
}
void IOPipeWork::AsyncPump()
{
AuMemoryViewWrite internalBuffer;
auto err = this->asyncStreamReader_->Dequeue(0, internalBuffer);
if (err != EStreamError::eErrorNone)
{
SysPushErrorIO("Async Stream Error: {}", err);
TerminateOnThread(true);
return;
}
if (internalBuffer.length == 0)
{
// end of stream
TerminateOnThread(false);
return;
}
err = this->asyncStreamReader_->Dequeue(internalBuffer.length, internalBuffer);
if (err != EStreamError::eErrorNone)
{
SysPushErrorIO("Async Stream Error: {}", err);
TerminateOnThread(true);
return;
}
this->buffer_.writePtr += internalBuffer.length;
TryPump();
}
void IOPipeWork::StreamPump()
{
AuUInt canBuffer = this->buffer_.RemainingWrite();
canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr));
AuUInt read {};
try
{
if (this->input_.reader->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer_.writePtr, canBuffer), read)) !=
AuIO::EStreamError::eErrorNone)
{
TerminateOnThread();
return;
}
}
catch (...)
{
SysPushErrorCatch();
}
this->buffer_.writePtr += read;
if (this->buffer_.writePtr == this->buffer_.base + this->buffer_.length)
{
this->buffer_.writePtr = this->buffer_.base;
}
TryPump();
}
void IOPipeWork::ReadNextAsync()
{
try
{
AuUInt canBuffer = this->buffer_.RemainingWrite();
canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr));
canBuffer = AuMin(AuUInt(this->frameCap_), canBuffer);
this->nextWriteAsync_ = AuMemoryViewWrite(this->buffer_.writePtr, canBuffer);
if (this->asyncStreamReader_->BeginRead(AuSPtr<AuMemoryViewWrite>(this->SharedFromThis(), &this->nextWriteAsync_)) !=
AuIO::EStreamError::eErrorNone)
{
TerminateOnThread(true);
return;
}
if (!this->nextWriteAsync_)
{
TerminateOnThread();
return;
}
}
catch (...)
{
SysPushErrorCatch();
}
}
void IOPipeWork::ReadNext()
{
if (!this->bActive)
{
return;
}
if (IsAtRequestedEnd())
{
TerminateOnThread(false);
return;
}
if (this->asyncTransaction_)
{
ReadNextAsync();
}
else
{
if (this->input_.backend)
{
this->input_.backend->OnEndPump();
}
}
}
AuUInt32 IOPipeWork::TryPump()
{
AuUInt bytesProcessedTotal {};
AuUInt bytesProcessed {};
do
{
AuUInt canRead = this->buffer_.RemainingBytes();
if (!canRead)
{
break;
}
canRead = AuMin<AuUInt>(canRead, (this->buffer_.length + this->buffer_.base) - this->buffer_.readPtr);
if (!canRead)
{
continue;
}
auto oldReadHeadPtr = this->buffer_.readPtr;
auto readHead = oldReadHeadPtr - this->buffer_.base;
try
{
if (this->output.type == EPipeCallbackType::eTryHandleBufferedPart)
{
if (!this->output.handleBufferedStream.onData->OnDataAvailable(this->buffer_))
{
bytesProcessed = 0;
this->buffer_.readPtr = this->buffer_.base + readHead;
}
else
{
bytesProcessed = this->buffer_.readPtr - oldReadHeadPtr;
}
}
else
{
if (!this->output.forwardStream.intercepter->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer_.readPtr, canRead), bytesProcessed), this->output.forwardStream.writer))
{
bytesProcessed = 0;
}
else
{
this->buffer_.readPtr += bytesProcessed;
}
}
}
catch (...)
{
SysPushErrorCatch();
}
bytesProcessedTotal += bytesProcessed;
if (this->buffer_.readPtr == this->buffer_.writePtr)
{
this->buffer_.readPtr = this->buffer_.base;
this->buffer_.writePtr = this->buffer_.base;
this->bShouldReadNext = true;
}
else if (!bytesProcessed)
{
this->bShouldReadNext = true;
}
// Prevent fucky end of allocation issues by moving the tail end of a partially buffered
// stream back to the start
// Should help with pacing massive files, where faster hardware can just vruum through a smaller buffer, leaving
// a load of small deserializable packets at the start of a large buffer, for the CPU to immediately start failing OnDataAvailable's
// much later into the stream, where a larger packet may overhang into memory we haven't reserved
// I really don't know how ReadNextAsync can be expected to wrap around a ring buffer
// We'd need to know if this pass failed, and if the read head is near the end, it'd know
// to wrap back around to zero. An overengineered pain and liability.
// This should work
if (readHead > (this->buffer_.length / 4) * 3)
{
auto readPtr = this->buffer_.base + readHead;
auto len = this->buffer_.writePtr - readPtr;
AuMemmove(this->buffer_.base, readPtr, len);
this->buffer_.writePtr = this->buffer_.base + len;
this->buffer_.readPtr = this->buffer_.base;
}
if (this->output.type == EPipeCallbackType::eWriteToWriter)
{
if (this->output.forwardStream.bFlushWriter)
{
this->output.forwardStream.writer->Flush();
}
}
}
while (AuExchange(bytesProcessed, 0));
if (this->buffer_.readPtr == this->buffer_.base)
{
this->bShouldReadNext = true;
}
this->bytesWritten_ += bytesProcessedTotal;
return bytesProcessedTotal;
}
void IOPipeWork::RunOnThread()
{
if (this->input_.backend)
{
this->input_.backend->OnStart();
}
if (this->asyncTransaction_)
{
PrepareAsync();
}
else
{
PrepareStream();
}
}
void IOPipeWork::TerminateOnThread(bool error)
{
if (!this->bActive)
{
return;
}
this->bActive = false;
if (this->watch)
{
watch->StopWatch();
}
// Free after init?
if (this->input_.backend)
{
this->input_.backend->OnEnd();
}
if (this->request.listener)
{
if (error)
{
// We explicitly failed...
this->request.listener->OnPipeFailureEvent();
}
else if (this->bytesWrittenLimit_ && (this->bytesWrittenLimit_ > this->bytesWritten_))
{
// Finished without error early
this->request.listener->OnPipeFailureEvent();
}
else
{
// We finished...
this->request.listener->OnPipeSuccessEvent();
}
}
}
IOPipeProcessor::IOPipeProcessor(IOProcessor *parent) :
parent_(parent)
{
}
AuSPtr<IIOPipeWork> IOPipeProcessor::NewBasicPipe(const IOPipeRequestBasic &request)
{
// TODO: Incomplete
return AuMakeShared<IOPipeWork>(AuStaticCast<IOPipeProcessor>(this->parent_->ToPipeProcessor()), request);
}
AuSPtr<IIOPipeWork> IOPipeProcessor::NewAIOPipe(const IOPipeRequestAIO &request)
{
// TODO: Incomplete
return AuMakeShared<IOPipeWork>(AuStaticCast<IOPipeProcessor>(this->parent_->ToPipeProcessor()), request);
}
}