[*] IOPipeProcessor improvements

[*] NT async file: error 38 "Reached the end of the file." should be handled as a zero length packet much like ERROR_BROKEN_PIPE for stream read interface consistency
This commit is contained in:
Reece Wilson 2022-07-20 15:10:07 +01:00
parent 8fcebb91a3
commit 8fb7b7e1ee
3 changed files with 85 additions and 19 deletions

View File

@ -26,10 +26,17 @@ namespace Aurora::IO
bool isStream {false};
/**
* @brief internal frame size or zero if fallback
* @brief internal frame size, that is one iteration of file or stream read, in bytes or zero if fallback
* Windows is inclined to read every single requested byte of a file stream asynchronously
* Streams, on all platforms, yield as soon as there is data available to copy over (usually)
*/
AuUInt32 pageLengthOrZero {};
/**
* @brief internal buffer size or zero if fallback
*/
AuUInt32 bufferLengthOrZero {};
/**
* @brief event listener
*/
@ -38,6 +45,11 @@ namespace Aurora::IO
/**
* @brief Used as the buffer size for streams of page length 0
*/
AuUInt32 fallbackPageSize {4096 * 50};
static const AuUInt32 kFallbackPageSize {1024 * 1024 * 5};
/**
* @brief Used as the buffer size for streams of buffer length 0
*/
static const AuUInt32 kFallbackBufferSize {1024 * 1024 * 50};
};
}

View File

@ -447,7 +447,9 @@ namespace Aurora::IO::FS
bool NtAsyncFileTransaction::Failed()
{
return this->hasFailed && this->osErrorCode != ERROR_BROKEN_PIPE;
return this->hasFailed &&
this->osErrorCode != ERROR_BROKEN_PIPE &&
this->osErrorCode != ERROR_HANDLE_EOF;
}
AuUInt NtAsyncFileTransaction::GetOSErrorCode()
@ -485,7 +487,8 @@ namespace Aurora::IO::FS
}
else
{
if (GetLastError() == ERROR_BROKEN_PIPE)
if (this->osErrorCode == ERROR_BROKEN_PIPE ||
this->osErrorCode == ERROR_HANDLE_EOF)
{
auto pipe = this->ntIpcPipeImpl.lock();

View File

@ -71,6 +71,12 @@ namespace Aurora::IO
AuMemoryViewWrite nextWriteAsync_;
IOPipeRequest request {};
bool bShouldReadNext {false};
bool IsAtRequestedEnd()
{
return this->bytesWrittenLimit_ && (this->bytesWrittenLimit_ <= this->bytesWritten_);
}
private:
AuSPtr<IOPipeProcessor> parent_;
@ -87,6 +93,7 @@ namespace Aurora::IO
IOWorkEnd endCallback;
bool bActive {true};
AuUInt32 bufferSize_ {};
AuUInt32 frameCap_ {};
AuUInt bytesWritten_ {};
AuUInt bytesWrittenLimit_ {};
AuByteBuffer buffer_;
@ -100,8 +107,8 @@ namespace Aurora::IO
endCallback(this),
output(request.output)
{
auto pageLen = request.pageLengthOrZero ? request.pageLengthOrZero : request.fallbackPageSize;
this->bufferSize_ = pageLen;
this->frameCap_ = request.pageLengthOrZero ? request.pageLengthOrZero : request.kFallbackPageSize;
this->bufferSize_ = request.bufferLengthOrZero ? request.bufferLengthOrZero : request.kFallbackBufferSize;
this->bytesWrittenLimit_ = request.lengthOrZero;
this->asyncTransaction_ = request.asyncTransaction;
this->asyncAdapter_ = NewAsyncStreamAdapter(request.asyncTransaction, request.isStream);
@ -116,8 +123,8 @@ namespace Aurora::IO
endCallback(this),
output(request.output)
{
auto pageLen = request.pageLengthOrZero ? request.pageLengthOrZero : request.fallbackPageSize;
this->bufferSize_ = pageLen;
this->bufferSize_ = request.bufferLengthOrZero ? request.bufferLengthOrZero : request.kFallbackBufferSize;
this->frameCap_ = request.pageLengthOrZero ? request.pageLengthOrZero : request.kFallbackPageSize;
this->bytesWrittenLimit_ = request.lengthOrZero;
}
@ -148,7 +155,6 @@ namespace Aurora::IO
void IOPipeWork::OnNominalCompletion()
{
this->TerminateOnThread();
}
@ -286,10 +292,6 @@ namespace Aurora::IO
}
this->buffer_.writePtr += internalBuffer.length;
if (this->buffer_.writePtr == this->buffer_.base + this->buffer_.length)
{
this->buffer_.writePtr = this->buffer_.base;
}
TryPump();
}
@ -329,11 +331,18 @@ namespace Aurora::IO
{
AuUInt canBuffer = this->buffer_.RemainingWrite();
canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr));
canBuffer = AuMin(AuUInt(this->frameCap_), canBuffer);
this->nextWriteAsync_ = AuMemoryViewWrite(this->buffer_.writePtr, canBuffer);
if (this->asyncStreamReader_->BeginRead(AuSPtr<AuMemoryViewWrite>(this->SharedFromThis(), &this->nextWriteAsync_)) !=
AuIO::EStreamError::eErrorNone)
{
TerminateOnThread(true);
return;
}
if (!this->nextWriteAsync_)
{
TerminateOnThread();
return;
@ -352,6 +361,12 @@ namespace Aurora::IO
return;
}
if (IsAtRequestedEnd())
{
TerminateOnThread(false);
return;
}
if (this->asyncTransaction_)
{
ReadNextAsync();
@ -384,24 +399,33 @@ namespace Aurora::IO
continue;
}
auto oldReadHeadPtr = this->buffer_.readPtr;
auto readHead = oldReadHeadPtr - this->buffer_.base;
try
{
if (this->output.type == EPipeCallbackType::eTryHandleBufferedPart)
{
auto readHead = this->buffer_.readPtr - this->buffer_.base;
if (!this->output.handleBufferedStream.onData->OnDataAvailable(this->buffer_))
{
bytesProcessed = 0;
this->buffer_.readPtr = this->buffer_.base + readHead;
}
else
{
bytesProcessed = this->buffer_.readPtr - oldReadHeadPtr;
}
}
else
{
if (!this->output.forwardStream.intercepter->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer_.readPtr, canRead), bytesProcessed), this->output.forwardStream.writer))
{
break;
bytesProcessed = 0;
}
else
{
this->buffer_.readPtr += bytesProcessed;
}
this->buffer_.readPtr += bytesProcessed;
}
}
catch (...)
@ -415,8 +439,35 @@ namespace Aurora::IO
{
this->buffer_.readPtr = this->buffer_.base;
this->buffer_.writePtr = this->buffer_.base;
this->bShouldReadNext = true;
}
else if (!bytesProcessed)
{
this->bShouldReadNext = true;
}
// Prevent fucky end of allocation issues by moving the tail end of a partially buffered
// stream back to the start
// Should help with pacing massive files, where faster hardware can just vruum through a smaller buffer, leaving
// a load of small deserializable packets at the start of a large buffer, for the CPU to immediately start failing OnDataAvailable's
// much later into the stream, where a larger packet may overhang into memory we haven't reserved
// I really don't know how ReadNextAsync can be excepted to wrap around a ring buffer
// We'd need to know if this pass failed, and if the read head is near the end, it'd know
// to wrap back around to zero. An overengineered pain and liability.
// This should work
if (readHead > (this->buffer_.length / 4) * 3)
{
auto readPtr = this->buffer_.base + readHead;
auto len = this->buffer_.writePtr - readPtr;
AuMemmove(this->buffer_.base, readPtr, len);
this->buffer_.writePtr = this->buffer_.base + len;
this->buffer_.readPtr = this->buffer_.base;
}
if (this->output.type == EPipeCallbackType::eWriteToWriter)
{
@ -428,15 +479,15 @@ namespace Aurora::IO
}
while (AuExchange(bytesProcessed, 0));
if (this->buffer_.readPtr == this->buffer_.writePtr)
if (this->buffer_.readPtr == this->buffer_.base)
{
this->bShouldReadNext = true;
}
this->bytesWritten_ += bytesProcessedTotal;
return bytesProcessedTotal;
}
void IOPipeWork::RunOnThread()
{
if (this->input_.backend)