From 8fb7b7e1eecf290497747bbeff53c7088ea62125 Mon Sep 17 00:00:00 2001 From: Reece Wilson Date: Wed, 20 Jul 2022 15:10:07 +0100 Subject: [PATCH] [*] IOPipeProcessor improvements [*] NT async file: error 38 "Reached the end of the file." should be handled as a zero length packet much like ERROR_BROKEN_PIPE for stream read interface consistency --- Include/Aurora/IO/IOPipeRequest.hpp | 16 +++++- Source/IO/FS/Async.NT.cpp | 7 ++- Source/IO/IOPipeProcessor.cpp | 81 +++++++++++++++++++++++------ 3 files changed, 85 insertions(+), 19 deletions(-) diff --git a/Include/Aurora/IO/IOPipeRequest.hpp b/Include/Aurora/IO/IOPipeRequest.hpp index 90aa4a98..628e75af 100644 --- a/Include/Aurora/IO/IOPipeRequest.hpp +++ b/Include/Aurora/IO/IOPipeRequest.hpp @@ -26,10 +26,17 @@ namespace Aurora::IO bool isStream {false}; /** - * @brief internal frame size or zero if fallback + * @brief internal frame size, that is one iteration of file or stream read, in bytes or zero if fallback + * Windows is inclined to read every single requested byte of a file stream asynchronously + * Streams, on all platforms, yield as soon as there is data available to copy over (usually) */ AuUInt32 pageLengthOrZero {}; + /** + * @brief internal buffer size or zero if fallback + */ + AuUInt32 bufferLengthOrZero {}; + /** * @brief event listener */ @@ -38,6 +45,11 @@ namespace Aurora::IO /** * @brief Used as the buffer size for streams of page length 0 */ - AuUInt32 fallbackPageSize {4096 * 50}; + static const AuUInt32 kFallbackPageSize {1024 * 1024 * 5}; + + /** + * @brief Used as the buffer size for streams of buffer length 0 + */ + static const AuUInt32 kFallbackBufferSize {1024 * 1024 * 50}; }; } \ No newline at end of file diff --git a/Source/IO/FS/Async.NT.cpp b/Source/IO/FS/Async.NT.cpp index c957cf3a..113910a9 100644 --- a/Source/IO/FS/Async.NT.cpp +++ b/Source/IO/FS/Async.NT.cpp @@ -447,7 +447,9 @@ namespace Aurora::IO::FS bool NtAsyncFileTransaction::Failed() { - return this->hasFailed && this->osErrorCode != ERROR_BROKEN_PIPE; + return this->hasFailed && + this->osErrorCode != ERROR_BROKEN_PIPE && + this->osErrorCode != ERROR_HANDLE_EOF; } AuUInt NtAsyncFileTransaction::GetOSErrorCode() @@ -485,7 +487,8 @@ namespace Aurora::IO::FS } else { - if (GetLastError() == ERROR_BROKEN_PIPE) + if (this->osErrorCode == ERROR_BROKEN_PIPE || + this->osErrorCode == ERROR_HANDLE_EOF) { auto pipe = this->ntIpcPipeImpl.lock(); diff --git a/Source/IO/IOPipeProcessor.cpp b/Source/IO/IOPipeProcessor.cpp index b1cd9aaa..3b4ee3b9 100644 --- a/Source/IO/IOPipeProcessor.cpp +++ b/Source/IO/IOPipeProcessor.cpp @@ -71,6 +71,12 @@ namespace Aurora::IO AuMemoryViewWrite nextWriteAsync_; IOPipeRequest request {}; bool bShouldReadNext {false}; + + bool IsAtRequestedEnd() + { + return this->bytesWrittenLimit_ && (this->bytesWrittenLimit_ <= this->bytesWritten_); + } + private: AuSPtr parent_; @@ -87,6 +93,7 @@ namespace Aurora::IO IOWorkEnd endCallback; bool bActive {true}; AuUInt32 bufferSize_ {}; + AuUInt32 frameCap_ {}; AuUInt bytesWritten_ {}; AuUInt bytesWrittenLimit_ {}; AuByteBuffer buffer_; @@ -100,8 +107,8 @@ namespace Aurora::IO endCallback(this), output(request.output) { - auto pageLen = request.pageLengthOrZero ? request.pageLengthOrZero : request.fallbackPageSize; - this->bufferSize_ = pageLen; + this->frameCap_ = request.pageLengthOrZero ? request.pageLengthOrZero : request.kFallbackPageSize; + this->bufferSize_ = request.bufferLengthOrZero ? request.bufferLengthOrZero : request.kFallbackBufferSize; this->bytesWrittenLimit_ = request.lengthOrZero; this->asyncTransaction_ = request.asyncTransaction; this->asyncAdapter_ = NewAsyncStreamAdapter(request.asyncTransaction, request.isStream); @@ -116,8 +123,8 @@ namespace Aurora::IO endCallback(this), output(request.output) { - auto pageLen = request.pageLengthOrZero ? request.pageLengthOrZero : request.fallbackPageSize; - this->bufferSize_ = pageLen; + this->bufferSize_ = request.bufferLengthOrZero ? request.bufferLengthOrZero : request.kFallbackBufferSize; + this->frameCap_ = request.pageLengthOrZero ? request.pageLengthOrZero : request.kFallbackPageSize; this->bytesWrittenLimit_ = request.lengthOrZero; } @@ -148,7 +155,6 @@ namespace Aurora::IO void IOPipeWork::OnNominalCompletion() { - this->TerminateOnThread(); } @@ -286,10 +292,6 @@ namespace Aurora::IO } this->buffer_.writePtr += internalBuffer.length; - if (this->buffer_.writePtr == this->buffer_.base + this->buffer_.length) - { - this->buffer_.writePtr = this->buffer_.base; - } TryPump(); } @@ -329,11 +331,18 @@ namespace Aurora::IO { AuUInt canBuffer = this->buffer_.RemainingWrite(); canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr)); + canBuffer = AuMin(AuUInt(this->frameCap_), canBuffer); this->nextWriteAsync_ = AuMemoryViewWrite(this->buffer_.writePtr, canBuffer); if (this->asyncStreamReader_->BeginRead(AuSPtr(this->SharedFromThis(), &this->nextWriteAsync_)) != AuIO::EStreamError::eErrorNone) + { + TerminateOnThread(true); + return; + } + + if (!this->nextWriteAsync_) { TerminateOnThread(); return; @@ -352,6 +361,12 @@ namespace Aurora::IO return; } + if (IsAtRequestedEnd()) + { + TerminateOnThread(false); + return; + } + if (this->asyncTransaction_) { ReadNextAsync(); @@ -384,24 +399,33 @@ namespace Aurora::IO continue; } + auto oldReadHeadPtr = this->buffer_.readPtr; + auto readHead = oldReadHeadPtr - this->buffer_.base; + try { if (this->output.type == EPipeCallbackType::eTryHandleBufferedPart) { - auto readHead = this->buffer_.readPtr - this->buffer_.base; if (!this->output.handleBufferedStream.onData->OnDataAvailable(this->buffer_)) { + bytesProcessed = 0; this->buffer_.readPtr = this->buffer_.base + readHead; } + else + { + bytesProcessed = this->buffer_.readPtr - oldReadHeadPtr; + } } else { if (!this->output.forwardStream.intercepter->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer_.readPtr, canRead), bytesProcessed), this->output.forwardStream.writer)) { - break; + bytesProcessed = 0; + } + else + { + this->buffer_.readPtr += bytesProcessed; } - - this->buffer_.readPtr += bytesProcessed; } } catch (...) @@ -415,8 +439,35 @@ namespace Aurora::IO { this->buffer_.readPtr = this->buffer_.base; this->buffer_.writePtr = this->buffer_.base; + this->bShouldReadNext = true; } + else if (!bytesProcessed) + { + this->bShouldReadNext = true; + } + + // Prevent fucky end of allocation issues by moving the tail end of a partially buffered + // stream back to the start + + // Should help with pacing massive files, where faster hardware can just vruum through a smaller buffer, leaving + // a load of small deserializable packets at the start of a large buffer, for the CPU to immediately start failing OnDataAvailable's + // much later into the stream, where a larger packet may overhang into memory we haven't reserved + + // I really don't know how ReadNextAsync can be excepted to wrap around a ring buffer + // We'd need to know if this pass failed, and if the read head is near the end, it'd know + // to wrap back around to zero. An overengineered pain and liability. + + // This should work + + if (readHead > (this->buffer_.length / 4) * 3) + { + auto readPtr = this->buffer_.base + readHead; + auto len = this->buffer_.writePtr - readPtr; + AuMemmove(this->buffer_.base, readPtr, len); + this->buffer_.writePtr = this->buffer_.base + len; + this->buffer_.readPtr = this->buffer_.base; + } if (this->output.type == EPipeCallbackType::eWriteToWriter) { @@ -428,15 +479,15 @@ namespace Aurora::IO } while (AuExchange(bytesProcessed, 0)); - if (this->buffer_.readPtr == this->buffer_.writePtr) + if (this->buffer_.readPtr == this->buffer_.base) { this->bShouldReadNext = true; } + this->bytesWritten_ += bytesProcessedTotal; return bytesProcessedTotal; } - void IOPipeWork::RunOnThread() { if (this->input_.backend)