[*] Fix IOPipeProcessor was checking against base + length instead of write head to determine stream consumer completion

[*] Fixup IPCPipe.NT recycling, including proper disconnect acknowledgment and removal of atomic release hack
[*] End of stream callback hack in Async.NT for IPC. net requires something else; this hack will do
[*] Fix io spin: bShouldReadNext wasn't reset on end of frame
This commit is contained in:
Reece Wilson 2022-06-22 21:24:04 +01:00
parent 0c3344fe46
commit 3a76aa6d69
7 changed files with 132 additions and 65 deletions

View File

@ -7,10 +7,12 @@
***/ ***/
#include <Source/RuntimeInternal.hpp> #include <Source/RuntimeInternal.hpp>
#include "FS.hpp" #include "FS.hpp"
#include <Source/IO/Loop/LSHandle.hpp>
//#include "IPCHandle.hpp"
#include "Async.NT.hpp" #include "Async.NT.hpp"
#include "FileAdvisory.NT.hpp" #include "FileAdvisory.NT.hpp"
#include <Source/IO/Loop/Loop.hpp> #include <Source/IO/Loop/Loop.hpp>
#include <Source/IO/Loop/LSHandle.hpp> #include <Source/IO/IPC/IPCPipe.NT.hpp>
namespace Aurora::IO::FS namespace Aurora::IO::FS
{ {
@ -263,8 +265,8 @@ namespace Aurora::IO::FS
hold->hasFailed = true; hold->hasFailed = true;
hold->osErrorCode = dwErrorCode; hold->osErrorCode = dwErrorCode;
} }
hold->Complete(); hold->CompleteEx(true);
} }
bool NtAsyncFileTransaction::IDontWannaUsePorts() bool NtAsyncFileTransaction::IDontWannaUsePorts()
@ -360,18 +362,35 @@ namespace Aurora::IO::FS
bool NtAsyncFileTransaction::Failed() bool NtAsyncFileTransaction::Failed()
{ {
return this->hasFailed; return this->hasFailed && GetOSErrorCode() != ERROR_BROKEN_PIPE;
} }
AuUInt NtAsyncFileTransaction::GetOSErrorCode() AuUInt NtAsyncFileTransaction::GetOSErrorCode()
{ {
return Failed() ? this->osErrorCode : ERROR_SUCCESS; return this->hasFailed ? this->osErrorCode : ERROR_SUCCESS;
} }
bool NtAsyncFileTransaction::Complete() bool NtAsyncFileTransaction::CompleteEx(bool completeRoutine)
{ {
DWORD read {}; DWORD read {};
if (GetOSErrorCode() == ERROR_BROKEN_PIPE)
{
if (!completeRoutine)
{
auto pipe = this->ntIpcPipeImpl.lock();
DispatchCb(0);
if (pipe)
{
pipe->OnEndOfReadStream();
}
}
return true;
}
if (!this->lastAbstractStat_) if (!this->lastAbstractStat_)
{ {
return false; return false;
@ -382,7 +401,7 @@ namespace Aurora::IO::FS
// return false; // return false;
//} //}
if ((this->hasFailed) || if ((this->hasFailed) ||
::GetOverlappedResult(this->handle_->handle, &this->overlap_, &read, false)) ::GetOverlappedResult(this->handle_->handle, &this->overlap_, &read, false))
{ {
bool bLatched = this->latch_; bool bLatched = this->latch_;
@ -393,6 +412,11 @@ namespace Aurora::IO::FS
return false; return false;
} }
bool NtAsyncFileTransaction::Complete()
{
return CompleteEx(false);
}
AuUInt32 NtAsyncFileTransaction::GetLastPacketLength() AuUInt32 NtAsyncFileTransaction::GetLastPacketLength()
{ {
DWORD read {}; DWORD read {};

View File

@ -7,6 +7,11 @@
***/ ***/
#pragma once #pragma once
namespace Aurora::IO::IPC
{
struct IPCPipeImpl;
}
namespace Aurora::IO::FS namespace Aurora::IO::FS
{ {
struct FileHandle struct FileHandle
@ -49,6 +54,8 @@ namespace Aurora::IO::FS
bool Complete() override; bool Complete() override;
bool CompleteEx(bool completeRoutine);
bool Failed() override; bool Failed() override;
AuUInt GetOSErrorCode() override; AuUInt GetOSErrorCode() override;
@ -77,6 +84,8 @@ namespace Aurora::IO::FS
AuUInt32 osErrorCode {}; AuUInt32 osErrorCode {};
bool hasFailed {}; bool hasFailed {};
AuWPtr<Aurora::IO::IPC::IPCPipeImpl> ntIpcPipeImpl;
private: private:
AuSPtr<void> memoryHold_; AuSPtr<void> memoryHold_;
AuSPtr<FileHandle> handle_; AuSPtr<FileHandle> handle_;

View File

@ -222,7 +222,6 @@ namespace Aurora::IO
if (length && if (length &&
parent->lastAllocation->streamIndex != length) parent->lastAllocation->streamIndex != length)
{ {
AuLogDbg("44");
AuDebugBreak(); AuDebugBreak();
return EStreamError::eErrorStreamInterrupted; return EStreamError::eErrorStreamInterrupted;
} }
@ -232,7 +231,6 @@ namespace Aurora::IO
if (parent->asyncActive && !parent->transaction->Complete()) if (parent->asyncActive && !parent->transaction->Complete())
{ {
AuLogDbg("222");
AuDebugBreak(); AuDebugBreak();
return EStreamError::eErrorStreamInterrupted; return EStreamError::eErrorStreamInterrupted;
} }
@ -244,7 +242,6 @@ namespace Aurora::IO
parent->lastAllocation = AuMakeShared<AsyncStreamMemory>(internalView); parent->lastAllocation = AuMakeShared<AsyncStreamMemory>(internalView);
if (!parent->lastAllocation) if (!parent->lastAllocation)
{ {
AuLogDbg("aa");
AuDebugBreak(); AuDebugBreak();
return EStreamError::eErrorStreamInterrupted; return EStreamError::eErrorStreamInterrupted;
} }
@ -265,6 +262,14 @@ namespace Aurora::IO
{ {
out = {}; out = {};
// Transaction error
if (parent->transaction->Failed())
{
parent->asyncActive = false;
parent->transaction->Reset();
return EStreamError::eErrorStreamInterrupted;
}
// Async error // Async error
if (parent->errorCode.HasValue()) if (parent->errorCode.HasValue())
{ {
@ -274,6 +279,7 @@ namespace Aurora::IO
if (code != EStreamError::eErrorNone) if (code != EStreamError::eErrorNone)
{ {
parent->asyncActive = false;
parent->transaction->Reset(); parent->transaction->Reset();
return code; return code;
} }
@ -282,6 +288,8 @@ namespace Aurora::IO
auto length = parent->transaction->GetLastPacketLength(); auto length = parent->transaction->GetLastPacketLength();
if (!length) if (!length)
{ {
parent->asyncActive = false;
parent->transaction->Reset();
return EStreamError::eErrorNone; return EStreamError::eErrorNone;
} }
@ -309,6 +317,7 @@ namespace Aurora::IO
if (parent->lastAllocation->streamIndex == length) if (parent->lastAllocation->streamIndex == length)
{ {
parent->asyncActive = false;
parent->transaction->Reset(); parent->transaction->Reset();
} }
} }

View File

@ -122,7 +122,7 @@ namespace Aurora::IO
void IOPipeWork::Tick_FrameEpilogue() void IOPipeWork::Tick_FrameEpilogue()
{ {
if (this->bShouldReadNext) if (AuExchange(this->bShouldReadNext, false))
{ {
this->ReadNext(); this->ReadNext();
} }
@ -401,9 +401,10 @@ namespace Aurora::IO
bytesProcessedTotal += bytesProcessed; bytesProcessedTotal += bytesProcessed;
if (this->buffer_.readPtr == this->buffer_.base + this->buffer_.length) if (this->buffer_.readPtr == this->buffer_.writePtr)
{ {
this->buffer_.readPtr = this->buffer_.base; this->buffer_.readPtr = this->buffer_.base;
this->buffer_.writePtr = this->buffer_.base;
this->bShouldReadNext = true; this->bShouldReadNext = true;
} }
@ -417,8 +418,7 @@ namespace Aurora::IO
} }
while (AuExchange(bytesProcessed, 0)); while (AuExchange(bytesProcessed, 0));
; if (this->buffer_.readPtr == this->buffer_.writePtr)
if (this->buffer_.readPtr == this->buffer_.base + this->buffer_.length)
{ {
this->bShouldReadNext = true; this->bShouldReadNext = true;
} }

View File

@ -51,31 +51,31 @@ namespace Aurora::IO::IPC
#else #else
#define PROXY_INTERNAL_INTERFACE_(Base)\ #define PROXY_INTERNAL_INTERFACE_(Base)\
virtual void OnPresleep() override \ inline virtual void OnPresleep() override \
{ \ { \
Base OnPresleep(); \ Base OnPresleep(); \
}; \ }; \
virtual bool OnTrigger(AuUInt handle) override \ inline virtual bool OnTrigger(AuUInt handle) override \
{ \ { \
return Base OnTrigger(handle); \ return Base OnTrigger(handle); \
} \ } \
virtual void OnFinishSleep() override \ inline virtual void OnFinishSleep() override \
{ \ { \
Base OnFinishSleep(); \ Base OnFinishSleep(); \
} \ } \
virtual bool Singular() override \ inline virtual bool Singular() override \
{ \ { \
return Base Singular(); \ return Base Singular(); \
} \ } \
virtual AuUInt GetHandle() override \ inline virtual AuUInt GetHandle() override \
{ \ { \
return Base GetHandle(); \ return Base GetHandle(); \
} \ } \
virtual const AuList<AuUInt> &GetHandles() override \ inline virtual const AuList<AuUInt> &GetHandles() override \
{ \ { \
return Base GetHandles(); \ return Base GetHandles(); \
} \ } \
bool HasValidHandle() \ inline bool HasValidHandle() \
{ \ { \
return Base HasValidHandle(); \ return Base HasValidHandle(); \
} }

View File

@ -8,13 +8,14 @@
#include <Source/RuntimeInternal.hpp> #include <Source/RuntimeInternal.hpp>
#include "IPC.hpp" #include "IPC.hpp"
#include "IPCHandle.hpp" #include "IPCHandle.hpp"
#include "IPCPipe.NT.hpp"
#include <Source/IO/Loop/ILoopSourceEx.hpp> #include <Source/IO/Loop/ILoopSourceEx.hpp>
#include <Source/IO/Loop/LSHandle.hpp> #include <Source/IO/Loop/LSHandle.hpp>
#include <Source/IO/Loop/LSEvent.hpp> #include <Source/IO/Loop/LSEvent.hpp>
#include <Source/IO/FS/Async.NT.hpp> #include <Source/IO/FS/Async.NT.hpp>
#include "IPCPipe.NT.hpp"
namespace Aurora::IO::IPC namespace Aurora::IO::IPC
{ {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -35,46 +36,6 @@ namespace Aurora::IO::IPC
AuWPtr<IPCPipeImpl> parent_; AuWPtr<IPCPipeImpl> parent_;
}; };
struct IPCPipeImpl : IPCPipe, Loop::LSHandle, AuEnableSharedFromThis<IPCPipeImpl>
{
IPCPipeImpl(HANDLE clientHandle, HANDLE serverHandle, const IPCHandle &handle);
~IPCPipeImpl();
PROXY_INTERNAL_INTERFACE_(LSHandle::)
virtual AuSPtr<IO::IAsyncTransaction> NewAsyncTransaction() override;
virtual AuSPtr<Loop::ILoopSource> AsReadChannelIsOpen() override;
virtual AuSPtr<Loop::ILoopSource> AsReadChannelHasData() override;
virtual bool Read(const Memory::MemoryViewStreamWrite &write, bool nonblocking) override;
virtual bool Write(const Memory::MemoryViewStreamRead &read) override;
virtual AuString ExportToString() override;
bool IsSignaled() override;
bool WaitOn(AuUInt32 timeout) override;
Loop::ELoopSource GetType() override;
HANDLE GetPipeHandle();
HANDLE GetConnectHandle()
{
return (HANDLE)AuStaticCast<Loop::LSEvent>(this->hasClient_)->GetHandle();
}
void TryConnect();
OVERLAPPED overlapped {};
AuSPtr<Loop::ILSEvent> hasClient_;
private:
HANDLE serverHandle_ {INVALID_HANDLE_VALUE};
HANDLE clientHandle_ {INVALID_HANDLE_VALUE};
IPCHandle ipcHandle_;
AuSPtr<IO::FS::FileHandle> fsHandle_;
AuSPtr<IO::FS::NtAsyncFileStream> fsStream_;
AuSPtr<IPCHasConnectionEvent> lshasConnection_;
bool bFirstTime {true};
};
IPCHasConnectionEvent::IPCHasConnectionEvent(AuSPtr<IPCPipeImpl> parent) : parent_(parent), LSHandle((AuUInt)parent->GetConnectHandle()) IPCHasConnectionEvent::IPCHasConnectionEvent(AuSPtr<IPCPipeImpl> parent) : parent_(parent), LSHandle((AuUInt)parent->GetConnectHandle())
{ {
@ -135,8 +96,7 @@ namespace Aurora::IO::IPC
bool firstTime = AuExchange(bFirstTime, false); bool firstTime = AuExchange(bFirstTime, false);
if (firstTime || if (firstTime ||
(WaitForSingleObject(this->overlapped.hEvent, 0) == WAIT_OBJECT_0 && (WaitForSingleObject(this->overlapped.hEvent, 0) == WAIT_OBJECT_0 &&
GetOverlappedResult(this->serverHandle_, &this->overlapped, &idc, false)) (GetOverlappedResult(this->serverHandle_, &this->overlapped, &idc, false))))
)
{ {
ResetEvent(this->overlapped.hEvent); ResetEvent(this->overlapped.hEvent);
@ -156,10 +116,12 @@ namespace Aurora::IO::IPC
} }
else else
{ {
#if 0
if (WaitForSingleObject(this->overlapped.hEvent, 0) == WAIT_OBJECT_0) if (WaitForSingleObject(this->overlapped.hEvent, 0) == WAIT_OBJECT_0)
{ {
ResetEvent(this->overlapped.hEvent); ResetEvent(this->overlapped.hEvent);
} }
#endif
} }
} }
@ -190,7 +152,12 @@ namespace Aurora::IO::IPC
AuSPtr<IO::IAsyncTransaction> IPCPipeImpl::NewAsyncTransaction() AuSPtr<IO::IAsyncTransaction> IPCPipeImpl::NewAsyncTransaction()
{ {
return this->fsStream_->NewTransaction(); auto transaction = AuStaticCast<AuFS::NtAsyncFileTransaction>(this->fsStream_->NewTransaction());
if (transaction)
{
transaction->ntIpcPipeImpl = AuSharedFromThis();
}
return transaction;
} }
bool IPCPipeImpl::Read(const Memory::MemoryViewStreamWrite &write, bool nonblocking) bool IPCPipeImpl::Read(const Memory::MemoryViewStreamWrite &write, bool nonblocking)
@ -260,6 +227,20 @@ namespace Aurora::IO::IPC
return this->clientHandle_ == INVALID_HANDLE_VALUE ? this->serverHandle_ : this->clientHandle_; return this->clientHandle_ == INVALID_HANDLE_VALUE ? this->serverHandle_ : this->clientHandle_;
} }
HANDLE IPCPipeImpl::GetConnectHandle()
{
return (HANDLE)AuStaticCast<Loop::LSEvent>(this->hasClient_)->GetHandle();
}
void IPCPipeImpl::OnEndOfReadStream()
{
// TODO: fire inverse LS
DisconnectNamedPipe(this->serverHandle_);
this->bFirstTime = true;
this->TryConnect();
}
bool IPCPipeImpl::IsSignaled() bool IPCPipeImpl::IsSignaled()
{ {
DWORD avail {}; DWORD avail {};

View File

@ -7,7 +7,51 @@
***/ ***/
#pragma once #pragma once
#include "IPC.hpp"
#include <Source/IO/IPC/IPCHandle.hpp>
namespace Aurora::IO::IPC namespace Aurora::IO::IPC
{ {
struct IPCHasConnectionEvent;
struct IPCHandle;
struct IPCPipeImpl : IPCPipe, Loop::LSHandle, AuEnableSharedFromThis<IPCPipeImpl>
{
IPCPipeImpl(HANDLE clientHandle, HANDLE serverHandle, const IPCHandle &handle);
~IPCPipeImpl();
PROXY_INTERNAL_INTERFACE_(LSHandle::)
virtual AuSPtr<IO::IAsyncTransaction> NewAsyncTransaction() override;
virtual AuSPtr<Loop::ILoopSource> AsReadChannelIsOpen() override;
virtual AuSPtr<Loop::ILoopSource> AsReadChannelHasData() override;
virtual bool Read(const Memory::MemoryViewStreamWrite &write, bool nonblocking) override;
virtual bool Write(const Memory::MemoryViewStreamRead &read) override;
virtual AuString ExportToString() override;
bool IsSignaled() override;
bool WaitOn(AuUInt32 timeout) override;
Loop::ELoopSource GetType() override;
HANDLE GetPipeHandle();
HANDLE GetConnectHandle();
void OnEndOfReadStream();
void TryConnect();
OVERLAPPED overlapped {};
AuSPtr<Loop::ILSEvent> hasClient_;
private:
HANDLE serverHandle_ {INVALID_HANDLE_VALUE};
HANDLE clientHandle_ {INVALID_HANDLE_VALUE};
IPCHandle ipcHandle_;
AuSPtr<IO::FS::FileHandle> fsHandle_;
AuSPtr<IO::FS::NtAsyncFileStream> fsStream_;
AuSPtr<IPCHasConnectionEvent> lshasConnection_;
bool bFirstTime {true};
};
} }