2022-06-11 23:01:27 +00:00
|
|
|
/***
|
|
|
|
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
|
|
|
|
File: IOPipeProcessor.cpp
|
|
|
|
Date: 2022-6-6
|
|
|
|
Author: Reece
|
|
|
|
***/
|
|
|
|
#include <Source/RuntimeInternal.hpp>
|
|
|
|
#include <Aurora/IO/IOExperimental.hpp>
|
|
|
|
#include "IOPipeProcessor.hpp"
|
2022-06-21 04:49:36 +00:00
|
|
|
#include "IOProcessor.hpp"
|
2022-06-11 23:01:27 +00:00
|
|
|
|
|
|
|
namespace Aurora::IO
|
|
|
|
{
|
2022-06-21 04:49:36 +00:00
|
|
|
struct IOPipeWork;
|
2022-06-11 23:01:27 +00:00
|
|
|
|
2022-06-21 04:49:36 +00:00
|
|
|
struct IOWorkStart : IIOProcessorWorkUnit
|
|
|
|
{
|
|
|
|
IOWorkStart(IOPipeWork *parent);
|
|
|
|
|
|
|
|
void OnRun() override;
|
|
|
|
void OnCanceled() override;
|
|
|
|
|
|
|
|
IOPipeWork *parent {};
|
|
|
|
};
|
|
|
|
|
|
|
|
struct IOWorkEnd : IIOProcessorWorkUnit
|
|
|
|
{
|
|
|
|
IOWorkEnd(IOPipeWork *parent);
|
|
|
|
|
|
|
|
void OnRun() override;
|
|
|
|
void OnCanceled() override;
|
|
|
|
|
|
|
|
IOPipeWork *parent {};
|
|
|
|
};
|
|
|
|
|
|
|
|
struct IOPipeWork : IIOPipeWork, IIOEventListenerFunctional, AuEnableSharedFromThis<IIOPipeWork>
|
|
|
|
{
|
|
|
|
IOPipeWork(const AuSPtr<IOPipeProcessor> &parent, const IOPipeRequestAIO &request);
|
|
|
|
IOPipeWork(const AuSPtr<IOPipeProcessor> &parent, const IOPipeRequestBasic &request);
|
|
|
|
|
|
|
|
AuSPtr<IIOProcessorItem> watch;
|
|
|
|
|
|
|
|
void Tick_FrameEpilogue() override;
|
|
|
|
void Tick_Any() override;
|
|
|
|
|
|
|
|
void OnFailureCompletion() override;
|
|
|
|
void OnNominalCompletion() override;
|
|
|
|
|
|
|
|
virtual bool Start() override;
|
|
|
|
|
|
|
|
virtual bool End() override;
|
|
|
|
|
|
|
|
void RunOnThread();
|
|
|
|
void TerminateOnThread(bool error = false);
|
|
|
|
|
|
|
|
// INIT
|
|
|
|
void PrepareStream();
|
|
|
|
void PrepareAsync();
|
|
|
|
|
|
|
|
// PUMP
|
|
|
|
void AsyncPump();
|
|
|
|
void StreamPump();
|
|
|
|
|
|
|
|
// END/INIT
|
|
|
|
void ReadNext();
|
|
|
|
void ReadNextAsync();
|
|
|
|
|
|
|
|
AuUInt32 TryPump();
|
|
|
|
|
|
|
|
AuMemoryViewWrite nextWriteAsync_;
|
|
|
|
IOPipeRequest request {};
|
|
|
|
bool bShouldReadNext {false};
|
|
|
|
private:
|
|
|
|
AuSPtr<IOPipeProcessor> parent_;
|
|
|
|
|
|
|
|
struct /*not a union. the following members are mutex*/
|
|
|
|
{
|
|
|
|
IOPipeInputData input_;
|
|
|
|
AuSPtr<IAsyncTransaction> asyncTransaction_;
|
2022-06-29 13:56:59 +00:00
|
|
|
AuSPtr<IAsyncStreamAdapter> asyncAdapter_;
|
2022-06-21 04:49:36 +00:00
|
|
|
AuSPtr<IAsyncStreamReader> asyncStreamReader_;
|
|
|
|
};
|
|
|
|
|
|
|
|
IOPipeCallback output;
|
|
|
|
IOWorkStart startCallback;
|
|
|
|
IOWorkEnd endCallback;
|
|
|
|
bool bActive {true};
|
|
|
|
AuUInt32 bufferSize_ {};
|
|
|
|
AuUInt bytesWritten_ {};
|
|
|
|
AuUInt bytesWrittenLimit_ {};
|
|
|
|
AuByteBuffer buffer_;
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
IOPipeWork::IOPipeWork(const AuSPtr<IOPipeProcessor> &parent, const IOPipeRequestAIO &request) :
|
|
|
|
parent_(parent),
|
|
|
|
request(request),
|
|
|
|
startCallback(this),
|
|
|
|
endCallback(this),
|
|
|
|
output(request.output)
|
|
|
|
{
|
|
|
|
auto pageLen = request.pageLengthOrZero ? request.pageLengthOrZero : request.fallbackPageSize;
|
|
|
|
this->bufferSize_ = pageLen;
|
|
|
|
this->bytesWrittenLimit_ = request.lengthOrZero;
|
|
|
|
this->asyncTransaction_ = request.asyncTransaction;
|
|
|
|
this->asyncAdapter_ = NewAsyncStreamAdapter(request.asyncTransaction, request.isStream);
|
2022-06-28 14:57:29 +00:00
|
|
|
SysAssert(this->asyncAdapter_);
|
2022-06-21 04:49:36 +00:00
|
|
|
this->asyncStreamReader_ = this->asyncAdapter_->ToStreamReader();
|
|
|
|
}
|
|
|
|
|
|
|
|
IOPipeWork::IOPipeWork(const AuSPtr<IOPipeProcessor> &parent, const IOPipeRequestBasic &request) :
|
|
|
|
parent_(parent),
|
|
|
|
request(request),
|
|
|
|
startCallback(this),
|
|
|
|
endCallback(this),
|
|
|
|
output(request.output)
|
|
|
|
{
|
|
|
|
auto pageLen = request.pageLengthOrZero ? request.pageLengthOrZero : request.fallbackPageSize;
|
|
|
|
this->bufferSize_ = pageLen;
|
|
|
|
this->bytesWrittenLimit_ = request.lengthOrZero;
|
|
|
|
}
|
|
|
|
|
|
|
|
void IOPipeWork::Tick_FrameEpilogue()
|
|
|
|
{
|
2022-06-22 20:24:04 +00:00
|
|
|
if (AuExchange(this->bShouldReadNext, false))
|
2022-06-21 04:49:36 +00:00
|
|
|
{
|
|
|
|
this->ReadNext();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void IOPipeWork::Tick_Any()
|
|
|
|
{
|
|
|
|
if (this->asyncTransaction_)
|
|
|
|
{
|
|
|
|
this->AsyncPump();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
this->StreamPump();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void IOPipeWork::OnFailureCompletion()
|
|
|
|
{
|
|
|
|
this->TerminateOnThread(true);
|
|
|
|
}
|
|
|
|
|
|
|
|
void IOPipeWork::OnNominalCompletion()
|
|
|
|
{
|
|
|
|
|
|
|
|
this->TerminateOnThread();
|
|
|
|
}
|
|
|
|
|
|
|
|
////////////////////////////////////////////////////////////
|
|
|
|
|
|
|
|
IOWorkStart::IOWorkStart(IOPipeWork *parent) : parent(parent)
|
|
|
|
{
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
void IOWorkStart::OnRun()
|
|
|
|
{
|
|
|
|
parent->RunOnThread();
|
|
|
|
}
|
|
|
|
|
|
|
|
void IOWorkStart::OnCanceled()
|
|
|
|
{
|
|
|
|
parent->TerminateOnThread();
|
|
|
|
}
|
|
|
|
|
|
|
|
////////////////////////////////////////////////////////////
|
|
|
|
|
|
|
|
IOWorkEnd::IOWorkEnd(IOPipeWork *parent) : parent(parent)
|
|
|
|
{
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
void IOWorkEnd::OnRun()
|
|
|
|
{
|
|
|
|
parent->TerminateOnThread();
|
|
|
|
}
|
|
|
|
|
|
|
|
void IOWorkEnd::OnCanceled()
|
|
|
|
{
|
|
|
|
parent->TerminateOnThread();
|
|
|
|
}
|
|
|
|
|
|
|
|
////////////////////////////////////////////////////////////
|
|
|
|
|
|
|
|
bool IOPipeWork::Start()
|
|
|
|
{
|
|
|
|
AuSPtr<IIOProcessorItem> ret;
|
|
|
|
|
|
|
|
if (this->asyncTransaction_)
|
|
|
|
{
|
|
|
|
ret = this->parent_->parent_->StartIOWatch(this->asyncAdapter_->ToWaitable(), AuSharedFromThis());
|
|
|
|
if (!ret)
|
|
|
|
{
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
ret = this->parent_->parent_->StartIOWatch(this->input_.watchItem, AuSharedFromThis());
|
|
|
|
|
|
|
|
if (!ret)
|
|
|
|
{
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
this->watch = ret;
|
|
|
|
|
|
|
|
if (this->parent_->parent_->CheckThread())
|
|
|
|
{
|
|
|
|
RunOnThread();
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
return this->parent_->
|
|
|
|
parent_->SubmitIOWorkItem(AuSPtr<IIOProcessorWorkUnit>(this->SharedFromThis(),
|
|
|
|
&this->startCallback));
|
|
|
|
}
|
|
|
|
|
|
|
|
bool IOPipeWork::End()
|
|
|
|
{
|
|
|
|
if (this->parent_->parent_->CheckThread())
|
|
|
|
{
|
|
|
|
TerminateOnThread();
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
return this->parent_->
|
|
|
|
parent_->SubmitIOWorkItem(AuSPtr<IIOProcessorWorkUnit>(this->SharedFromThis(),
|
|
|
|
&this->endCallback));
|
|
|
|
}
|
|
|
|
|
|
|
|
void IOPipeWork::PrepareStream()
|
|
|
|
{
|
|
|
|
if (this->buffer_.IsEmpty())
|
|
|
|
{
|
|
|
|
this->buffer_.Allocate(this->bufferSize_);
|
|
|
|
this->buffer_.flagCircular = true; // !!!
|
|
|
|
}
|
|
|
|
|
|
|
|
if (this->buffer_.IsEmpty())
|
|
|
|
{
|
|
|
|
SysPushErrorMem();
|
|
|
|
TerminateOnThread(true);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void IOPipeWork::PrepareAsync()
|
|
|
|
{
|
|
|
|
PrepareStream();
|
|
|
|
ReadNext();
|
|
|
|
}
|
|
|
|
|
|
|
|
void IOPipeWork::AsyncPump()
|
|
|
|
{
|
|
|
|
AuMemoryViewWrite internalBuffer;
|
|
|
|
|
|
|
|
auto err = this->asyncStreamReader_->Dequeue(0, internalBuffer);
|
|
|
|
if (err != EStreamError::eErrorNone)
|
|
|
|
{
|
|
|
|
SysPushErrorIO("Async Stream Error: {}", err);
|
|
|
|
TerminateOnThread(true);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (internalBuffer.length == 0)
|
|
|
|
{
|
|
|
|
// end of stream
|
|
|
|
TerminateOnThread(false);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
err = this->asyncStreamReader_->Dequeue(internalBuffer.length, internalBuffer);
|
|
|
|
if (err != EStreamError::eErrorNone)
|
|
|
|
{
|
|
|
|
SysPushErrorIO("Async Stream Error: {}", err);
|
|
|
|
TerminateOnThread(true);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
this->buffer_.writePtr += internalBuffer.length;
|
|
|
|
if (this->buffer_.writePtr == this->buffer_.base + this->buffer_.length)
|
|
|
|
{
|
|
|
|
this->buffer_.writePtr = this->buffer_.base;
|
|
|
|
}
|
|
|
|
|
|
|
|
TryPump();
|
|
|
|
}
|
|
|
|
|
|
|
|
void IOPipeWork::StreamPump()
|
|
|
|
{
|
|
|
|
AuUInt canBuffer = this->buffer_.RemainingWrite();
|
|
|
|
canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr));
|
|
|
|
|
|
|
|
AuUInt read {};
|
|
|
|
try
|
|
|
|
{
|
|
|
|
if (this->input_.reader->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer_.writePtr, canBuffer), read)) !=
|
|
|
|
AuIO::EStreamError::eErrorNone)
|
|
|
|
{
|
|
|
|
TerminateOnThread();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
SysPushErrorCatch();
|
|
|
|
}
|
|
|
|
|
|
|
|
this->buffer_.writePtr += read;
|
|
|
|
if (this->buffer_.writePtr == this->buffer_.base + this->buffer_.length)
|
|
|
|
{
|
|
|
|
this->buffer_.writePtr = this->buffer_.base;
|
|
|
|
}
|
|
|
|
|
|
|
|
TryPump();
|
|
|
|
}
|
|
|
|
|
|
|
|
void IOPipeWork::ReadNextAsync()
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
AuUInt canBuffer = this->buffer_.RemainingWrite();
|
2022-07-04 22:41:29 +00:00
|
|
|
canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr));
|
2022-06-21 04:49:36 +00:00
|
|
|
|
|
|
|
this->nextWriteAsync_ = AuMemoryViewWrite(this->buffer_.writePtr, canBuffer);
|
|
|
|
|
|
|
|
if (this->asyncStreamReader_->BeginRead(AuSPtr<AuMemoryViewWrite>(this->SharedFromThis(), &this->nextWriteAsync_)) !=
|
|
|
|
AuIO::EStreamError::eErrorNone)
|
|
|
|
{
|
|
|
|
TerminateOnThread();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
SysPushErrorCatch();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void IOPipeWork::ReadNext()
|
|
|
|
{
|
|
|
|
if (!this->bActive)
|
|
|
|
{
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (this->asyncTransaction_)
|
|
|
|
{
|
|
|
|
ReadNextAsync();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
if (this->input_.backend)
|
|
|
|
{
|
|
|
|
this->input_.backend->OnEndPump();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
AuUInt32 IOPipeWork::TryPump()
|
|
|
|
{
|
|
|
|
AuUInt bytesProcessedTotal {};
|
|
|
|
AuUInt bytesProcessed {};
|
|
|
|
|
|
|
|
do
|
|
|
|
{
|
|
|
|
AuUInt canRead = this->buffer_.RemainingBytes();
|
2022-07-04 22:41:29 +00:00
|
|
|
if (!canRead)
|
|
|
|
{
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2022-06-21 04:49:36 +00:00
|
|
|
canRead = AuMin<AuUInt>(canRead, (this->buffer_.length + this->buffer_.base) - this->buffer_.readPtr);
|
2022-07-04 22:41:29 +00:00
|
|
|
if (!canRead)
|
|
|
|
{
|
|
|
|
continue;
|
|
|
|
}
|
2022-06-21 04:49:36 +00:00
|
|
|
|
|
|
|
try
|
|
|
|
{
|
|
|
|
if (this->output.type == EPipeCallbackType::eTryHandleBufferedPart)
|
|
|
|
{
|
|
|
|
auto readHead = this->buffer_.readPtr - this->buffer_.base;
|
|
|
|
if (!this->output.handleBufferedStream.onData->OnDataAvailable(this->buffer_))
|
|
|
|
{
|
|
|
|
this->buffer_.readPtr = this->buffer_.base + readHead;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
if (!this->output.forwardStream.intercepter->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer_.readPtr, canRead), bytesProcessed), this->output.forwardStream.writer))
|
|
|
|
{
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
this->buffer_.readPtr += bytesProcessed;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
SysPushErrorCatch();
|
|
|
|
}
|
|
|
|
|
|
|
|
bytesProcessedTotal += bytesProcessed;
|
|
|
|
|
2022-06-22 20:24:04 +00:00
|
|
|
if (this->buffer_.readPtr == this->buffer_.writePtr)
|
2022-06-21 04:49:36 +00:00
|
|
|
{
|
|
|
|
this->buffer_.readPtr = this->buffer_.base;
|
2022-06-22 20:24:04 +00:00
|
|
|
this->buffer_.writePtr = this->buffer_.base;
|
2022-06-21 04:49:36 +00:00
|
|
|
this->bShouldReadNext = true;
|
|
|
|
}
|
|
|
|
|
2022-06-22 13:42:17 +00:00
|
|
|
if (this->output.type == EPipeCallbackType::eWriteToWriter)
|
2022-06-21 04:49:36 +00:00
|
|
|
{
|
2022-06-22 13:42:17 +00:00
|
|
|
if (this->output.forwardStream.bFlushWriter)
|
2022-06-21 04:49:36 +00:00
|
|
|
{
|
|
|
|
this->output.forwardStream.writer->Flush();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
while (AuExchange(bytesProcessed, 0));
|
|
|
|
|
2022-06-22 20:24:04 +00:00
|
|
|
if (this->buffer_.readPtr == this->buffer_.writePtr)
|
2022-06-21 04:49:36 +00:00
|
|
|
{
|
|
|
|
this->bShouldReadNext = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
return bytesProcessedTotal;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void IOPipeWork::RunOnThread()
|
|
|
|
{
|
|
|
|
if (this->input_.backend)
|
|
|
|
{
|
|
|
|
this->input_.backend->OnStart();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (this->asyncTransaction_)
|
|
|
|
{
|
|
|
|
PrepareAsync();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
PrepareStream();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void IOPipeWork::TerminateOnThread(bool error)
|
|
|
|
{
|
|
|
|
if (!this->bActive)
|
|
|
|
{
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
this->bActive = false;
|
|
|
|
|
|
|
|
if (this->watch)
|
|
|
|
{
|
|
|
|
watch->StopWatch();
|
|
|
|
}
|
|
|
|
|
|
|
|
// Free after init?
|
|
|
|
if (this->input_.backend)
|
|
|
|
{
|
|
|
|
this->input_.backend->OnEnd();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (this->request.listener)
|
|
|
|
{
|
|
|
|
if (error)
|
|
|
|
{
|
|
|
|
this->request.listener->OnPipeFailureEvent();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
this->request.listener->OnPipeSuccessEvent();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
IOPipeProcessor::IOPipeProcessor(IOProcessor *parent) :
|
|
|
|
parent_(parent)
|
|
|
|
{
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
AuSPtr<IIOPipeWork> IOPipeProcessor::NewBasicPipe(const IOPipeRequestBasic &request)
|
|
|
|
{
|
|
|
|
// TODO: Incomplete
|
|
|
|
return AuMakeShared<IOPipeWork>(AuStaticCast<IOPipeProcessor>(this->parent_->ToPipeProcessor()), request);
|
|
|
|
}
|
|
|
|
|
|
|
|
AuSPtr<IIOPipeWork> IOPipeProcessor::NewAIOPipe(const IOPipeRequestAIO &request)
|
|
|
|
{
|
|
|
|
// TODO: Incomplete
|
|
|
|
return AuMakeShared<IOPipeWork>(AuStaticCast<IOPipeProcessor>(this->parent_->ToPipeProcessor()), request);
|
|
|
|
}
|
2022-06-11 23:01:27 +00:00
|
|
|
}
|