/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: IOPipeProcessor.cpp Date: 2022-6-6 Author: Reece ***/ #include #include #include "IOProcessor.hpp" #include "IOPipeProcessor.hpp" namespace Aurora::IO { IOPipeWork::IOPipeWork(const AuSPtr &parent, const IOPipeRequestAIO &request) : parent_(parent), request(request), startCallback(this), endCallback(this), output(request.output) { this->uFrameCap_ = request.uPageLengthOrZero ? request.uPageLengthOrZero : request.kFallbackPageSize; this->uBufferSize_ = request.uBufferLengthOrZero ? request.uBufferLengthOrZero : request.kFallbackBufferSize; this->uBytesWrittenLimit_ = request.uLengthOrZero; this->pAsyncTransaction_ = request.pAsyncTransaction; this->pAsyncAdapter_ = NewAsyncStreamAdapter(request.pAsyncTransaction, request.bIsStream); SysAssert(this->pAsyncAdapter_); this->pAsyncStreamReader_ = this->pAsyncAdapter_->ToStreamReader(); } IOPipeWork::IOPipeWork(const AuSPtr &parent, const IOPipeRequestBasic &request) : parent_(parent), request(request), startCallback(this), endCallback(this), output(request.output) { this->uBufferSize_ = request.uBufferLengthOrZero ? request.uBufferLengthOrZero : request.kFallbackBufferSize; this->uFrameCap_ = request.uPageLengthOrZero ? request.uPageLengthOrZero : request.kFallbackPageSize; this->uBytesWrittenLimit_ = request.uLengthOrZero; } void IOPipeWork::Tick_FrameEpilogue() { if (AuExchange(this->bShouldReadNext, false)) { this->ReadNext(); } } void IOPipeWork::Tick_Any() { if (this->pAsyncTransaction_) { this->AsyncPump(); } else { this->StreamPump(); } } void IOPipeWork::OnFailureCompletion() { this->TerminateOnThread(true); } void IOPipeWork::OnNominalCompletion() { this->TerminateOnThread(); } //////////////////////////////////////////////////////////// IOWorkStart::IOWorkStart(IOPipeWork *parent) : parent(parent) { } void IOWorkStart::OnRun() { parent->RunOnThread(); } void IOWorkStart::OnCanceled() { parent->TerminateOnThread(); } //////////////////////////////////////////////////////////// IOWorkEnd::IOWorkEnd(IOPipeWork *parent) : parent(parent) { } void IOWorkEnd::OnRun() { parent->TerminateOnThread(); } void IOWorkEnd::OnCanceled() { parent->TerminateOnThread(); } //////////////////////////////////////////////////////////// bool IOPipeWork::Start() { AuSPtr ret; if (this->pAsyncTransaction_) { auto pWaitable = this->pAsyncAdapter_->ToWaitable(); if (pWaitable) { ret = this->parent_->parent_->StartIOWatch(pWaitable, AuSharedFromThis()); if (!ret) { return false; } } } else { ret = this->parent_->parent_->StartIOWatch(this->input_.pWatchItem, AuSharedFromThis()); if (!ret) { return false; } } this->pWatch = ret; if (this->parent_->parent_->CheckThread()) { RunOnThread(); return true; } return this->parent_-> parent_->SubmitIOWorkItem(AuSPtr(this->SharedFromThis(), &this->startCallback)); } bool IOPipeWork::End() { if (this->parent_->parent_->CheckThread()) { TerminateOnThread(); return true; } return this->parent_-> parent_->SubmitIOWorkItem(AuSPtr(this->SharedFromThis(), &this->endCallback)); } void IOPipeWork::PrepareStream() { if (!this->buffer_.IsEmpty()) { return; } if (!this->buffer_.Allocate(this->uBufferSize_, AuHwInfo::GetPageSize(), true)) { SysPushErrorMem(); TerminateOnThread(true); return; } } void IOPipeWork::PrepareAsync() { PrepareStream(); ReadNext(); } void IOPipeWork::AsyncPump() { AuMemoryViewWrite internalBuffer; auto err = this->pAsyncStreamReader_->Dequeue(0, internalBuffer); if (err != EStreamError::eErrorNone) { SysPushErrorIO("Async Stream Error: {}", err); TerminateOnThread(true); return; } if (internalBuffer.length == 0) { // end of stream TerminateOnThread(false); return; } err = this->pAsyncStreamReader_->Dequeue(internalBuffer.length, internalBuffer); if (err != EStreamError::eErrorNone) { SysPushErrorIO("Async Stream Error: {}", err); TerminateOnThread(true); return; } this->buffer_.writePtr += internalBuffer.length; TryPump(); } void IOPipeWork::StreamPump() { AuUInt canBuffer = this->buffer_.RemainingWrite(); canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr)); AuUInt read {}; try { if (this->input_.pReader->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer_.writePtr, canBuffer), read)) != AuIO::EStreamError::eErrorNone) { TerminateOnThread(); return; } } catch (...) { SysPushErrorCatch(); } this->buffer_.writePtr += read; if (this->buffer_.writePtr == this->buffer_.base + this->buffer_.length) { this->buffer_.writePtr = this->buffer_.base; } TryPump(); } void IOPipeWork::ReadNextAsync() { try { AuUInt canBuffer = this->buffer_.RemainingWrite(); canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr)); canBuffer = AuMin(AuUInt(this->uFrameCap_), canBuffer); this->nextWriteAsync_ = AuMemoryViewWrite(this->buffer_.writePtr, canBuffer); if (this->pAsyncStreamReader_->BeginRead(AuSPtr(this->SharedFromThis(), &this->nextWriteAsync_)) != AuIO::EStreamError::eErrorNone) { TerminateOnThread(true); return; } if (!this->nextWriteAsync_) { TerminateOnThread(); return; } } catch (...) { SysPushErrorCatch(); } } void IOPipeWork::ReadNext() { if (!this->bActive) { return; } if (IsAtRequestedEnd()) { TerminateOnThread(false); return; } if (this->pAsyncTransaction_) { ReadNextAsync(); } else { if (this->input_.pBackend) { this->input_.pBackend->OnEndPump(); } } } AuUInt32 IOPipeWork::TryPump() { AuUInt bytesProcessedTotal {}; AuUInt bytesProcessed {}; do { AuUInt canRead = this->buffer_.RemainingBytes(); if (!canRead) { break; } canRead = AuMin(canRead, (this->buffer_.length + this->buffer_.base) - this->buffer_.readPtr); if (!canRead) { continue; } auto oldReadHeadPtr = this->buffer_.readPtr; auto readHead = oldReadHeadPtr - this->buffer_.base; try { if (this->output.type == EPipeCallbackType::eTryHandleBufferedPart) { if (this->output.handleBufferedStream.pOnData) { if (!this->output.handleBufferedStream.pOnData->OnDataAvailable(this->buffer_)) { bytesProcessed = 0; this->buffer_.readPtr = this->buffer_.base + readHead; } else { bytesProcessed = this->buffer_.readPtr - oldReadHeadPtr; } } } else if (this->output.type == EPipeCallbackType::eWriteToWriter) { if (this->output.forwardStream.pIntercepter) { if (!this->output.forwardStream.pIntercepter->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer_.readPtr, canRead), bytesProcessed), this->output.forwardStream.pWriter)) { bytesProcessed = 0; } else { this->buffer_.readPtr += bytesProcessed; } } } } catch (...) { SysPushErrorCatch(); } bytesProcessedTotal += bytesProcessed; if (this->buffer_.readPtr == this->buffer_.writePtr) { this->buffer_.readPtr = this->buffer_.base; this->buffer_.writePtr = this->buffer_.base; this->bShouldReadNext = true; } else if (!bytesProcessed) { this->bShouldReadNext = true; } // Prevent fucky end of allocation issues by moving the tail end of a partially buffered // stream back to the start // Should help with pacing massive files, where faster hardware can just vruum through a smaller buffer, leaving // a load of small deserializable packets at the start of a large buffer, for the CPU to immediately start failing OnDataAvailable's // much later into the stream, where a larger packet may overhang into memory we haven't reserved // I really don't know how ReadNextAsync can be expected to wrap around a ring buffer // We'd need to know if this pass failed, and if the read head is near the end, it'd know // to wrap back around to zero. An overengineered pain and liability. // This should work if (readHead > (this->buffer_.length / 4) * 3) { auto readPtr = this->buffer_.base + readHead; auto len = this->buffer_.writePtr - readPtr; AuMemmove(this->buffer_.base, readPtr, len); this->buffer_.writePtr = this->buffer_.base + len; this->buffer_.readPtr = this->buffer_.base; } if (this->output.type == EPipeCallbackType::eWriteToWriter) { if (this->output.forwardStream.bFlushWriter) { if (this->output.forwardStream.pWriter) { this->output.forwardStream.pWriter->Flush(); } } } } while (AuExchange(bytesProcessed, 0)); if (this->buffer_.readPtr == this->buffer_.base) { this->bShouldReadNext = true; } this->uBytesWritten_ += bytesProcessedTotal; if (this->request.pListener) { this->request.pListener->OnPipePartialEvent(bytesProcessedTotal); } return bytesProcessedTotal; } bool IOPipeWork::IsAtRequestedEnd() { return this->uBytesWrittenLimit_ && (this->uBytesWrittenLimit_ <= this->uBytesWritten_); } AuByteBuffer *IOPipeWork::GetBuffer() { return &this->buffer_; } void IOPipeWork::RunOnThread() { if (this->input_.pBackend) { this->input_.pBackend->OnStart(); } if (this->pAsyncTransaction_) { PrepareAsync(); } else { PrepareStream(); } } void IOPipeWork::TerminateOnThread(bool error) { if (!this->bActive) { return; } this->bActive = false; if (this->pWatch) { this->pWatch->StopWatch(); } if (this->request.pListener) { if (error) { // We explicitly failed... this->request.pListener->OnPipeFailureEvent(); } else if (this->uBytesWrittenLimit_ && (this->uBytesWrittenLimit_ > this->uBytesWritten_)) { // Finished without error early this->request.pListener->OnPipeFailureEvent(); } else { // We finished... this->request.pListener->OnPipeSuccessEvent(); } this->request.pListener.reset(); } this->output.handleBufferedStream.pOnData.reset(); this->output.forwardStream.pIntercepter.reset(); this->output.forwardStream.pWriter.reset(); if (this->input_.pBackend) { this->input_.pBackend->OnEnd(); this->input_.pBackend.reset(); } if (auto transaction = this->pAsyncTransaction_) { transaction->Reset(); this->pAsyncTransaction_.reset(); } this->pAsyncAdapter_.reset(); this->pAsyncStreamReader_.reset(); } IOPipeProcessor::IOPipeProcessor(IOProcessor *parent) : parent_(parent) { } AuSPtr IOPipeProcessor::NewBasicPipe(const IOPipeRequestBasic &request) { // TODO: Incomplete return AuMakeShared(AuStaticCast(this->parent_->ToPipeProcessor()), request); } AuSPtr IOPipeProcessor::NewAIOPipe(const IOPipeRequestAIO &request) { // TODO: Incomplete return AuMakeShared(AuStaticCast(this->parent_->ToPipeProcessor()), request); } }