/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: IOPipeProcessor.cpp Date: 2022-6-6 Author: Reece ***/ #include #include #include "IOPipeProcessor.hpp" #include "IOProcessor.hpp" namespace Aurora::IO { struct IOPipeWork; struct IOWorkStart : IIOProcessorWorkUnit { IOWorkStart(IOPipeWork *parent); void OnRun() override; void OnCanceled() override; IOPipeWork *parent {}; }; struct IOWorkEnd : IIOProcessorWorkUnit { IOWorkEnd(IOPipeWork *parent); void OnRun() override; void OnCanceled() override; IOPipeWork *parent {}; }; struct IOPipeWork : IIOPipeWork, IIOEventListenerFunctional, AuEnableSharedFromThis { IOPipeWork(const AuSPtr &parent, const IOPipeRequestAIO &request); IOPipeWork(const AuSPtr &parent, const IOPipeRequestBasic &request); AuSPtr watch; void Tick_FrameEpilogue() override; void Tick_Any() override; void OnFailureCompletion() override; void OnNominalCompletion() override; virtual bool Start() override; virtual bool End() override; void RunOnThread(); void TerminateOnThread(bool error = false); // INIT void PrepareStream(); void PrepareAsync(); // PUMP void AsyncPump(); void StreamPump(); // END/INIT void ReadNext(); void ReadNextAsync(); AuUInt32 TryPump(); AuMemoryViewWrite nextWriteAsync_; IOPipeRequest request {}; bool bShouldReadNext {false}; bool IsAtRequestedEnd() { return this->bytesWrittenLimit_ && (this->bytesWrittenLimit_ <= this->bytesWritten_); } private: AuSPtr parent_; struct /*not a union. the following members are mutex*/ { IOPipeInputData input_; AuSPtr asyncTransaction_; AuSPtr asyncAdapter_; AuSPtr asyncStreamReader_; }; IOPipeCallback output; IOWorkStart startCallback; IOWorkEnd endCallback; bool bActive {true}; AuUInt32 bufferSize_ {}; AuUInt32 frameCap_ {}; AuUInt bytesWritten_ {}; AuUInt bytesWrittenLimit_ {}; AuByteBuffer buffer_; }; IOPipeWork::IOPipeWork(const AuSPtr &parent, const IOPipeRequestAIO &request) : parent_(parent), request(request), startCallback(this), endCallback(this), output(request.output) { this->frameCap_ = request.pageLengthOrZero ? request.pageLengthOrZero : request.kFallbackPageSize; this->bufferSize_ = request.bufferLengthOrZero ? request.bufferLengthOrZero : request.kFallbackBufferSize; this->bytesWrittenLimit_ = request.lengthOrZero; this->asyncTransaction_ = request.asyncTransaction; this->asyncAdapter_ = NewAsyncStreamAdapter(request.asyncTransaction, request.isStream); SysAssert(this->asyncAdapter_); this->asyncStreamReader_ = this->asyncAdapter_->ToStreamReader(); } IOPipeWork::IOPipeWork(const AuSPtr &parent, const IOPipeRequestBasic &request) : parent_(parent), request(request), startCallback(this), endCallback(this), output(request.output) { this->bufferSize_ = request.bufferLengthOrZero ? request.bufferLengthOrZero : request.kFallbackBufferSize; this->frameCap_ = request.pageLengthOrZero ? request.pageLengthOrZero : request.kFallbackPageSize; this->bytesWrittenLimit_ = request.lengthOrZero; } void IOPipeWork::Tick_FrameEpilogue() { if (AuExchange(this->bShouldReadNext, false)) { this->ReadNext(); } } void IOPipeWork::Tick_Any() { if (this->asyncTransaction_) { this->AsyncPump(); } else { this->StreamPump(); } } void IOPipeWork::OnFailureCompletion() { this->TerminateOnThread(true); } void IOPipeWork::OnNominalCompletion() { this->TerminateOnThread(); } //////////////////////////////////////////////////////////// IOWorkStart::IOWorkStart(IOPipeWork *parent) : parent(parent) { } void IOWorkStart::OnRun() { parent->RunOnThread(); } void IOWorkStart::OnCanceled() { parent->TerminateOnThread(); } //////////////////////////////////////////////////////////// IOWorkEnd::IOWorkEnd(IOPipeWork *parent) : parent(parent) { } void IOWorkEnd::OnRun() { parent->TerminateOnThread(); } void IOWorkEnd::OnCanceled() { parent->TerminateOnThread(); } //////////////////////////////////////////////////////////// bool IOPipeWork::Start() { AuSPtr ret; if (this->asyncTransaction_) { ret = this->parent_->parent_->StartIOWatch(this->asyncAdapter_->ToWaitable(), AuSharedFromThis()); if (!ret) { return false; } } else { ret = this->parent_->parent_->StartIOWatch(this->input_.watchItem, AuSharedFromThis()); if (!ret) { return false; } } this->watch = ret; if (this->parent_->parent_->CheckThread()) { RunOnThread(); return true; } return this->parent_-> parent_->SubmitIOWorkItem(AuSPtr(this->SharedFromThis(), &this->startCallback)); } bool IOPipeWork::End() { if (this->parent_->parent_->CheckThread()) { TerminateOnThread(); return true; } return this->parent_-> parent_->SubmitIOWorkItem(AuSPtr(this->SharedFromThis(), &this->endCallback)); } void IOPipeWork::PrepareStream() { if (!this->buffer_.IsEmpty()) { return; } if (!this->buffer_.Allocate(this->bufferSize_, AuHwInfo::GetPageSize(), true)) { SysPushErrorMem(); TerminateOnThread(true); return; } this->buffer_.flagCircular = true; // !!! } void IOPipeWork::PrepareAsync() { PrepareStream(); ReadNext(); } void IOPipeWork::AsyncPump() { AuMemoryViewWrite internalBuffer; auto err = this->asyncStreamReader_->Dequeue(0, internalBuffer); if (err != EStreamError::eErrorNone) { SysPushErrorIO("Async Stream Error: {}", err); TerminateOnThread(true); return; } if (internalBuffer.length == 0) { // end of stream TerminateOnThread(false); return; } err = this->asyncStreamReader_->Dequeue(internalBuffer.length, internalBuffer); if (err != EStreamError::eErrorNone) { SysPushErrorIO("Async Stream Error: {}", err); TerminateOnThread(true); return; } this->buffer_.writePtr += internalBuffer.length; TryPump(); } void IOPipeWork::StreamPump() { AuUInt canBuffer = this->buffer_.RemainingWrite(); canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr)); AuUInt read {}; try { if (this->input_.reader->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer_.writePtr, canBuffer), read)) != AuIO::EStreamError::eErrorNone) { TerminateOnThread(); return; } } catch (...) { SysPushErrorCatch(); } this->buffer_.writePtr += read; if (this->buffer_.writePtr == this->buffer_.base + this->buffer_.length) { this->buffer_.writePtr = this->buffer_.base; } TryPump(); } void IOPipeWork::ReadNextAsync() { try { AuUInt canBuffer = this->buffer_.RemainingWrite(); canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr)); canBuffer = AuMin(AuUInt(this->frameCap_), canBuffer); this->nextWriteAsync_ = AuMemoryViewWrite(this->buffer_.writePtr, canBuffer); if (this->asyncStreamReader_->BeginRead(AuSPtr(this->SharedFromThis(), &this->nextWriteAsync_)) != AuIO::EStreamError::eErrorNone) { TerminateOnThread(true); return; } if (!this->nextWriteAsync_) { TerminateOnThread(); return; } } catch (...) { SysPushErrorCatch(); } } void IOPipeWork::ReadNext() { if (!this->bActive) { return; } if (IsAtRequestedEnd()) { TerminateOnThread(false); return; } if (this->asyncTransaction_) { ReadNextAsync(); } else { if (this->input_.backend) { this->input_.backend->OnEndPump(); } } } AuUInt32 IOPipeWork::TryPump() { AuUInt bytesProcessedTotal {}; AuUInt bytesProcessed {}; do { AuUInt canRead = this->buffer_.RemainingBytes(); if (!canRead) { break; } canRead = AuMin(canRead, (this->buffer_.length + this->buffer_.base) - this->buffer_.readPtr); if (!canRead) { continue; } auto oldReadHeadPtr = this->buffer_.readPtr; auto readHead = oldReadHeadPtr - this->buffer_.base; try { if (this->output.type == EPipeCallbackType::eTryHandleBufferedPart) { if (!this->output.handleBufferedStream.onData->OnDataAvailable(this->buffer_)) { bytesProcessed = 0; this->buffer_.readPtr = this->buffer_.base + readHead; } else { bytesProcessed = this->buffer_.readPtr - oldReadHeadPtr; } } else { if (!this->output.forwardStream.intercepter->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer_.readPtr, canRead), bytesProcessed), this->output.forwardStream.writer)) { bytesProcessed = 0; } else { this->buffer_.readPtr += bytesProcessed; } } } catch (...) { SysPushErrorCatch(); } bytesProcessedTotal += bytesProcessed; if (this->buffer_.readPtr == this->buffer_.writePtr) { this->buffer_.readPtr = this->buffer_.base; this->buffer_.writePtr = this->buffer_.base; this->bShouldReadNext = true; } else if (!bytesProcessed) { this->bShouldReadNext = true; } // Prevent fucky end of allocation issues by moving the tail end of a partially buffered // stream back to the start // Should help with pacing massive files, where faster hardware can just vruum through a smaller buffer, leaving // a load of small deserializable packets at the start of a large buffer, for the CPU to immediately start failing OnDataAvailable's // much later into the stream, where a larger packet may overhang into memory we haven't reserved // I really don't know how ReadNextAsync can be expected to wrap around a ring buffer // We'd need to know if this pass failed, and if the read head is near the end, it'd know // to wrap back around to zero. An overengineered pain and liability. // This should work if (readHead > (this->buffer_.length / 4) * 3) { auto readPtr = this->buffer_.base + readHead; auto len = this->buffer_.writePtr - readPtr; AuMemmove(this->buffer_.base, readPtr, len); this->buffer_.writePtr = this->buffer_.base + len; this->buffer_.readPtr = this->buffer_.base; } if (this->output.type == EPipeCallbackType::eWriteToWriter) { if (this->output.forwardStream.bFlushWriter) { this->output.forwardStream.writer->Flush(); } } } while (AuExchange(bytesProcessed, 0)); if (this->buffer_.readPtr == this->buffer_.base) { this->bShouldReadNext = true; } this->bytesWritten_ += bytesProcessedTotal; if (this->request.listener) { this->request.listener->OnPipePartialEvent(bytesProcessedTotal); } return bytesProcessedTotal; } void IOPipeWork::RunOnThread() { if (this->input_.backend) { this->input_.backend->OnStart(); } if (this->asyncTransaction_) { PrepareAsync(); } else { PrepareStream(); } } void IOPipeWork::TerminateOnThread(bool error) { if (!this->bActive) { return; } this->bActive = false; if (this->watch) { watch->StopWatch(); } // Free after init? if (this->input_.backend) { this->input_.backend->OnEnd(); } if (this->request.listener) { if (error) { // We explicitly failed... this->request.listener->OnPipeFailureEvent(); } else if (this->bytesWrittenLimit_ && (this->bytesWrittenLimit_ > this->bytesWritten_)) { // Finished without error early this->request.listener->OnPipeFailureEvent(); } else { // We finished... this->request.listener->OnPipeSuccessEvent(); } } } IOPipeProcessor::IOPipeProcessor(IOProcessor *parent) : parent_(parent) { } AuSPtr IOPipeProcessor::NewBasicPipe(const IOPipeRequestBasic &request) { // TODO: Incomplete return AuMakeShared(AuStaticCast(this->parent_->ToPipeProcessor()), request); } AuSPtr IOPipeProcessor::NewAIOPipe(const IOPipeRequestAIO &request) { // TODO: Incomplete return AuMakeShared(AuStaticCast(this->parent_->ToPipeProcessor()), request); } }