/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuIOPipeProcessor.cpp Date: 2022-6-6 Author: Reece ***/ #include #include #include "AuIOProcessor.hpp" #include "AuIOPipeProcessor.hpp" #include namespace Aurora::IO { IOPipeWork::IOPipeWork(const AuSPtr &parent, const IOPipeRequestAIO &request) : parent_(parent), request(request), startCallback(this), endCallback(this), output(request.output) { this->uFrameCap_ = request.uPageLengthOrZero ? request.uPageLengthOrZero : request.kFallbackPageSize; this->uBufferSize_ = request.uBufferLengthOrZero ? request.uBufferLengthOrZero : request.kFallbackBufferSize; this->uBytesWrittenLimit_ = request.uLengthOrZero; this->uBytesWrittenTarget_ = request.uMinBytesToRead ? request.uMinBytesToRead : request.uLengthOrZero; this->pAsyncTransaction_ = request.pAsyncTransaction; this->pAsyncAdapter_ = Adapters::NewAsyncStreamAdapter(request.pAsyncTransaction, request.bIsStream); SysAssert(this->pAsyncAdapter_); this->pAsyncAdapter_->SetReadOffset(request.uStartOffset); this->pAsyncAdapter_->SetWriteOffset(request.uStartOffset); this->pAsyncStreamReader_ = this->pAsyncAdapter_->ToStreamReader(); } IOPipeWork::IOPipeWork(const AuSPtr &parent, const IOPipeRequestBasic &request) : parent_(parent), request(request), startCallback(this), endCallback(this), output(request.output) { this->uBufferSize_ = request.uBufferLengthOrZero ? request.uBufferLengthOrZero : request.kFallbackBufferSize; this->uFrameCap_ = request.uPageLengthOrZero ? request.uPageLengthOrZero : request.kFallbackPageSize; this->uBytesWrittenLimit_ = request.uLengthOrZero; this->uBytesWrittenTarget_ = request.uMinBytesToRead ? request.uMinBytesToRead : request.uLengthOrZero; } void IOPipeWork::Tick_FrameEpilogue() { if (this->bWritingAheadLowLatency) { this->TryPump(); } if (AuExchange(this->bShouldReadNext, false)) { this->ReadNext(); } } void IOPipeWork::Tick_Any() { if (this->pAsyncTransaction_) { this->AsyncPump(); } else { this->DoReallocTick(); this->StreamPump(); } } void IOPipeWork::OnFailureCompletion() { this->TerminateOnThread(true); } void IOPipeWork::OnNominalCompletion() { this->TerminateOnThread(); } //////////////////////////////////////////////////////////// IOWorkStart::IOWorkStart(IOPipeWork *parent) : parent(parent) { } void IOWorkStart::OnRun() { parent->RunOnThread(); } void IOWorkStart::OnCanceled() { parent->TerminateOnThread(); } //////////////////////////////////////////////////////////// IOWorkEnd::IOWorkEnd(IOPipeWork *parent) : parent(parent) { } void IOWorkEnd::OnRun() { parent->TerminateOnThread(); } void IOWorkEnd::OnCanceled() { parent->TerminateOnThread(); } //////////////////////////////////////////////////////////// bool IOPipeWork::Start() { AuSPtr ret; this->iStartTickMS_ = AuTime::CurrentClockMS(); if (this->pAsyncTransaction_) { auto pWaitable = this->pAsyncAdapter_->ToWaitable(); if (pWaitable) { ret = this->parent_->parent_->StartIOWatch(pWaitable, AuSharedFromThis()); if (!ret) { return false; } } } else { ret = this->parent_->parent_->StartIOWatch(this->input_.pWatchItem, AuSharedFromThis()); if (!ret) { return false; } } this->pWatch = ret; if (this->parent_->parent_->CheckThread()) { RunOnThread(); return true; } return this->parent_-> parent_->SubmitIOWorkItem(AuSPtr(this->SharedFromThis(), &this->startCallback)); } bool IOPipeWork::End() { if (this->parent_->parent_->CheckThread()) { TerminateOnThread(); return true; } return this->parent_-> parent_->SubmitIOWorkItem(AuSPtr(this->SharedFromThis(), &this->endCallback)); } AuInt64 IOPipeWork::GetLastTickMS() { return this->throughput_.GetLastFrameTimeWall(); } AuInt64 IOPipeWork::GetStartTickMS() { return this->iStartTickMS_; } double IOPipeWork::GetPredictedThroughput() { return this->throughput_.GetEstimatedHertz(); } AuUInt64 IOPipeWork::GetBytesProcessed() { return this->uBytesWritten_; } AuUInt64 IOPipeWork::GetBytesProcessedInterframe() { return this->uBytesWritten_ + this->bytesProcessedInterframe_; } void IOPipeWork::PrepareStream() { if (!this->buffer_.IsEmpty()) { return; } if (!this->buffer_.Allocate(this->uBufferSize_, AuHwInfo::GetPageSize(), true)) { SysPushErrorMem(); TerminateOnThread(true); return; } } void IOPipeWork::PrepareAsync() { PrepareStream(); ReadNext(); } void IOPipeWork::AsyncPump() { AuMemoryViewWrite internalBuffer; auto err = this->pAsyncStreamReader_->Dequeue(0, internalBuffer); if (err != EStreamError::eErrorNone) { SysPushErrorIO("Async Stream Error: {}", err); TerminateOnThread(true); return; } if (internalBuffer.length == 0) { // end of stream TerminateOnThread(false); return; } err = this->pAsyncStreamReader_->Dequeue(internalBuffer.length, internalBuffer); if (err != EStreamError::eErrorNone) { SysPushErrorIO("Async Stream Error: {}", err); TerminateOnThread(true); return; } this->buffer_.writePtr += internalBuffer.length; // end of low-latency read-ahead tick if (this->bWritingAheadLowLatency) { // shift if running out of linear space auto readHead = this->buffer_.readPtr - this->buffer_.base; if (readHead > (this->buffer_.length / 4) * 3) { auto readPtr = this->buffer_.base + readHead; auto len = this->buffer_.writePtr - readPtr; AuMemmove(this->buffer_.base, readPtr, len); this->buffer_.writePtr = this->buffer_.base + len; this->buffer_.readPtr = this->buffer_.base; } this->bWritingAheadLowLatency = false; } this->DoReallocTick(); // attempt low-latency read-ahead if (!this->bWritingAheadLowLatency && this->buffer_.CanWrite(this->uFrameCap_) /*ensure we can run ahead*/ && !this->IsAtRequestedEnd() /*do not preemptively terminate before the last callback is fired*/) { this->bWritingAheadLowLatency = true; this->ReadNext(); // TryPump is delegated to the frame epilogue so that we can do a batched send of the next frames reads // followed by a tick of frame[-1] } else { this->TryPump(); } } void IOPipeWork::StreamPump() { AuUInt canBuffer = this->GetNextFrameLength2(); AuUInt read {}; try { if (this->input_.pReader->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer_.writePtr, canBuffer), read)) != AuIO::EStreamError::eErrorNone) { TerminateOnThread(); return; } } catch (...) { SysPushErrorCatch(); } this->buffer_.writePtr += read; if (this->buffer_.writePtr == this->buffer_.base + this->buffer_.length) { this->buffer_.writePtr = this->buffer_.base; } TryPump(); } bool IOPipeWork::ReallocateLater(AuUInt uLength) { if (!uLength) { return false; } if (this->uPendingRealloc_) { return false; } this->bLastReallocFail = false; this->uPendingRealloc_ = uLength; return true; } bool IOPipeWork::IsReallocationPending() { return bool(this->uPendingRealloc_); } bool IOPipeWork::DidLastReallocFail() { return this->bLastReallocFail; } AuUInt IOPipeWork::GetSuccessfulReallocations() { return this->uReallocs_[0]; } AuUInt IOPipeWork::GetFailedReallocations() { return this->uReallocs_[1]; } AuUInt IOPipeWork::SetNextFrameTargetLength(AuUInt uLength) { return AuExchange(this->uBytesPerFrame_, uLength); } AuUInt IOPipeWork::GetNextFrameTargetLength() { return this->uBytesPerFrame_; } AuUInt IOPipeWork::GetNextFrameLength(AuUInt uBytesMax) { if (auto uAltMax = this->uBytesPerFrame_) { return AuMin(uAltMax, uBytesMax); } else { return uBytesMax; } } AuUInt IOPipeWork::GetNextFrameLength2() { AuUInt canBuffer = this->buffer_.RemainingWrite(); canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr)); canBuffer = this->GetNextFrameLength(canBuffer); return canBuffer; } void IOPipeWork::ReadNextAsync() { try { AuUInt canBuffer = this->GetNextFrameLength2(); this->nextWriteAsync_ = AuMemoryViewWrite(this->buffer_.writePtr, GetNextFrameLength(canBuffer)); if (this->pAsyncStreamReader_->BeginRead(AuSPtr(this->SharedFromThis(), &this->nextWriteAsync_)) != AuIO::EStreamError::eErrorNone) { if (this->bWritingAheadLowLatency) { this->bWritingAheadIOUOneTerminate = true; } else { TerminateOnThread(true); } return; } if (!this->nextWriteAsync_) { TerminateOnThread(); return; } } catch (...) { SysPushErrorCatch(); } } void IOPipeWork::ReadNext() { if (!this->bActive) { return; } if (IsAtRequestedEnd()) { TerminateOnThread(false); return; } if (this->pAsyncTransaction_) { ReadNextAsync(); } else { if (this->input_.pBackend) { this->input_.pBackend->OnEndPump(); } } } AuUInt32 IOPipeWork::TryPump() { AuUInt &bytesProcessedTotal = this->bytesProcessedInterframe_; AuUInt bytesProcessed {}; bool bIsCullingLastFrame {}; bytesProcessedTotal = 0; do { AuUInt canRead2 = this->buffer_.RemainingBytes(); AuUInt canRead = canRead2; if (!canRead) { break; } canRead = AuMin(canRead, (this->buffer_.length + this->buffer_.base) - this->buffer_.readPtr); if (!canRead) { continue; } auto oldReadHeadPtr = this->buffer_.readPtr; auto readHead = oldReadHeadPtr - this->buffer_.base; auto oldWriteHeadPtr = this->buffer_.writePtr; auto writeHead = oldWriteHeadPtr - this->buffer_.base; auto uInterframeProgress = this->GetBytesProcessedInterframe(); if ((bIsCullingLastFrame = ((this->uBytesWrittenLimit_ && canRead2 + uInterframeProgress > this->uBytesWrittenLimit_)))) { auto uLastFrameBytes = this->uBytesWrittenLimit_ - uInterframeProgress; auto uAbsDataToRead = AuMin(canRead, uLastFrameBytes); this->buffer_.writePtr = this->buffer_.readPtr + uAbsDataToRead; } if (this->pProtocolStack) { this->pProtocolStack->DoTick(); } try { if (this->output.type == EPipeCallbackType::eTryHandleBufferedPart) { if (this->output.handleBufferedStream.pOnData) { if (!this->output.handleBufferedStream.pOnData->OnDataAvailable(this->buffer_)) { bytesProcessed = 0; this->buffer_.readPtr = this->buffer_.base + readHead; } else { bytesProcessed = this->buffer_.readPtr - oldReadHeadPtr; } } else { bytesProcessed = this->buffer_.readPtr - oldReadHeadPtr; } } else if (this->output.type == EPipeCallbackType::eWriteToWriter) { if (this->output.forwardStream.pIntercepter && this->output.forwardStream.pWriter) { if (!this->output.forwardStream.pIntercepter->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer_.readPtr, canRead), bytesProcessed), this->output.forwardStream.pWriter)) { bytesProcessed = 0; } else { this->buffer_.readPtr += bytesProcessed; } } else { bytesProcessed = this->buffer_.readPtr - oldReadHeadPtr; } } } catch (...) { SysPushErrorCatch(); } bytesProcessedTotal += bytesProcessed; if (!this->bWritingAheadLowLatency) { if (oldWriteHeadPtr != this->buffer_.writePtr) { this->buffer_.writePtr = this->buffer_.base + writeHead; } if (bIsCullingLastFrame) { this->bShouldReadNext = false; break; } if (this->buffer_.readPtr == this->buffer_.writePtr) { this->buffer_.readPtr = this->buffer_.base; this->buffer_.writePtr = this->buffer_.base; this->bShouldReadNext = true; } else if (!bytesProcessed) { this->bShouldReadNext = true; } // Prevent fucky end of allocation issues by moving the tail end of a partially buffered stream back to the start // Should help with packing massive files, where faster disks can spin through smaller frames, leaving // the CPU to catch up towards the end of the buffer, at which point the linearity breaks. // We must instead force linearity, and with the assumption we can move peekable memory around, we must eventually // move the tail end of the buffer back to the start, just so we can continue that stream view linearity. // I really don't know how ReadNextAsync can be expected to wrap around a ring buffer. // We'd need to know if this pass failed, and if the read head is near the end, it'd know to wrap back around to zero. // An overengineered pain and liability. // This should allow us to continue working in linear space without resorting to circular ring buffers { auto readHead = this->buffer_.readPtr - this->buffer_.base; if (readHead > (this->buffer_.length / 4) * 3) { auto readPtr = this->buffer_.base + readHead; auto len = this->buffer_.writePtr - readPtr; AuMemmove(this->buffer_.base, readPtr, len); this->buffer_.writePtr = this->buffer_.base + len; this->buffer_.readPtr = this->buffer_.base; } } } if (this->output.type == EPipeCallbackType::eWriteToWriter) { if (this->output.forwardStream.bFlushWriter) { if (this->output.forwardStream.pWriter) { this->output.forwardStream.pWriter->Flush(); } } } } while (AuExchange(bytesProcessed, 0)); if (!this->bWritingAheadLowLatency) { if (!bIsCullingLastFrame) { if (this->buffer_.readPtr == this->buffer_.base) { this->bShouldReadNext = true; } } } this->uBytesWritten_ += bytesProcessedTotal; this->throughput_.OnUpdate(bytesProcessedTotal); if (this->request.pListener) { this->request.pListener->OnPipePartialEvent(bytesProcessedTotal); } if (bIsCullingLastFrame) { this->TerminateOnThread(false); } if (this->bWritingAheadIOUOneTerminate) { this->TerminateOnThread(true); } return bytesProcessedTotal; } bool IOPipeWork::IsAtRequestedEnd() { return this->uBytesWrittenLimit_ && (this->uBytesWrittenLimit_ <= this->uBytesWritten_); } void IOPipeWork::FailRealloc() { this->uReallocs_[1]++; this->bLastReallocFail = true; if (auto pListener = this->request.pListener) { pListener->OnPipeReallocEvent(false); } } void IOPipeWork::SuccessRealloc() { this->uReallocs_[0]++; if (auto pListener = this->request.pListener) { pListener->OnPipeReallocEvent(true); } } void IOPipeWork::DoReallocTick() { auto uNextSize = AuExchange(this->uPendingRealloc_, 0); if (!uNextSize) { return; } AuByteBuffer replacement(uNextSize, false, false); if (!replacement) { this->FailRealloc(); return; } if (!replacement.WriteFrom(this->buffer_)) { this->FailRealloc(); return; } this->buffer_ = AuMove(replacement); this->SuccessRealloc(); } AuByteBuffer *IOPipeWork::GetBuffer() { return &this->buffer_; } void IOPipeWork::RunOnThread() { if (this->input_.pBackend) { this->input_.pBackend->OnStart(); } if (this->pAsyncTransaction_) { PrepareAsync(); } else { PrepareStream(); } } void IOPipeWork::TerminateOnThread(bool error) { if (!this->bActive) { return; } this->bActive = false; if (this->output.type == EPipeCallbackType::eWriteToWriter) { if (this->output.forwardStream.bCloseWriter) { if (auto pWriter = this->output.forwardStream.pWriter) { pWriter->Close(); } } } if (this->pWatch) { this->pWatch->StopWatch(); } if (this->request.pListener) { if (error) { // We explicitly failed... this->request.pListener->OnPipeFailureEvent(); } else if (this->uBytesWrittenTarget_ && (this->uBytesWrittenTarget_ > this->uBytesWritten_)) { // Finished without error early this->request.pListener->OnPipeFailureEvent(); } else { // We finished... this->request.pListener->OnPipeSuccessEvent(); } this->request.pListener.reset(); } this->output.handleBufferedStream.pOnData.reset(); this->output.forwardStream.pIntercepter.reset(); this->output.forwardStream.pWriter.reset(); if (this->input_.pBackend) { this->input_.pBackend->OnEnd(); this->input_.pBackend.reset(); } if (auto transaction = this->pAsyncTransaction_) { transaction->Reset(); this->pAsyncTransaction_.reset(); } this->pAsyncAdapter_.reset(); this->pAsyncStreamReader_.reset(); if (this->pProtocolStack) { this->pProtocolStack->Destroy(); this->pProtocolStack.reset(); } this->PrivateUserDataClear(); } IOPipeProcessor::IOPipeProcessor(IOProcessor *parent) : parent_(parent) { } AuSPtr IOPipeProcessor::NewBasicPipe(const IOPipeRequestBasic &request) { // TODO: Incomplete return AuMakeShared(AuStaticCast(this->parent_->ToPipeProcessor()), request); } AuSPtr IOPipeProcessor::NewAIOPipe(const IOPipeRequestAIO &request) { // TODO: Incomplete return AuMakeShared(AuStaticCast(this->parent_->ToPipeProcessor()), request); } }