Jamie Reece Wilson
5cafefae5d
[*] Fix: IO objects not using the explicit slow sync primitives. Dunno how these werent refactored; then again, i never properly got around to finishing the factories for fast/slow io objects. In addition, some of these arent even accessed by the ILoopSource interface, so it's not as critical of a failure.
504 lines
13 KiB
C++
504 lines
13 KiB
C++
/***
|
|
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
File: AuIPCPipe.NT.cpp
|
|
Date: 2022-4-15
|
|
Author: Reece
|
|
***/
|
|
#include <Source/RuntimeInternal.hpp>
|
|
#include "IPC.hpp"
|
|
#include "AuIPCHandle.hpp"
|
|
|
|
#include <Source/IO/Loop/ILoopSourceEx.hpp>
|
|
#include <Source/IO/Loop/LSHandle.hpp>
|
|
#include <Source/IO/Loop/LSEvent.hpp>
|
|
#include <Source/IO/FS/Async.NT.hpp>
|
|
|
|
#include <Source/IO/AuIOHandle.hpp>
|
|
|
|
#include "AuIPCPipe.NT.hpp"
|
|
|
|
namespace Aurora::IO::IPC
|
|
{
|
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
// Pipes
|
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
struct IPCPipeImpl;
|
|
|
|
struct IPCHasConnectionEvent : Loop::LSHandle
|
|
{
|
|
IPCHasConnectionEvent(AuSPtr<IPCPipeImpl> parent);
|
|
|
|
bool IsSignaled() override;
|
|
Loop::ELoopSource GetType() override;
|
|
|
|
bool OnTrigger(AuUInt handle) override;
|
|
|
|
private:
|
|
AuWPtr<IPCPipeImpl> parent_;
|
|
};
|
|
|
|
IPCHasConnectionEvent::IPCHasConnectionEvent(AuSPtr<IPCPipeImpl> parent) : parent_(parent), LSHandle((AuUInt)parent->GetConnectHandle())
|
|
{
|
|
|
|
}
|
|
|
|
bool IPCHasConnectionEvent::IsSignaled()
|
|
{
|
|
return OnTrigger(0);
|
|
}
|
|
|
|
Loop::ELoopSource IPCHasConnectionEvent::GetType()
|
|
{
|
|
return Loop::ELoopSource::eSourceIPCHasClient;
|
|
}
|
|
|
|
bool IPCHasConnectionEvent::OnTrigger(AuUInt handle)
|
|
{
|
|
auto parent = this->parent_.lock();
|
|
if (!parent)
|
|
{
|
|
SysPushErrorMem("IPC pipe is dead");
|
|
return false;
|
|
}
|
|
|
|
parent->TryConnect();
|
|
if (WaitForSingleObject(parent->overlapped.hEvent, 0) != WAIT_OBJECT_0)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
if (parent->clientHandle_ != INVALID_HANDLE_VALUE)
|
|
{
|
|
return true;
|
|
}
|
|
|
|
DWORD avail {};
|
|
auto h = parent->GetPipeHandle();
|
|
if (!PeekNamedPipe(h, NULL, NULL, NULL, &avail, NULL))
|
|
{
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
IPCPipeImpl::IPCPipeImpl(HANDLE clientHandle, HANDLE serverHandle, const IPCHandle &handle) :
|
|
serverHandle_(serverHandle),
|
|
clientHandle_(clientHandle),
|
|
ipcHandle_(handle),
|
|
pipeReader_(this),
|
|
pipeWriter_(this)
|
|
{
|
|
if (serverHandle != INVALID_HANDLE_VALUE)
|
|
{
|
|
this->hasClient_ = Loop::NewLSEventSlow(false, false, true);
|
|
if (!this->hasClient_)
|
|
{
|
|
this->bDead = true;
|
|
return;
|
|
}
|
|
}
|
|
|
|
this->fsHandle_ = AuIO::IOHandleShared();
|
|
if (!this->fsHandle_)
|
|
{
|
|
this->bDead = true;
|
|
return;
|
|
}
|
|
|
|
this->fsStream_ = AuMakeShared<IO::FS::NtAsyncFileStream>();
|
|
if (!this->fsStream_)
|
|
{
|
|
this->bDead = true;
|
|
return;
|
|
}
|
|
|
|
(void)this->fsHandle_->InitFromMove((AuUInt)this->GetPipeHandle());
|
|
AuStaticCast<AFileHandle>(this->fsHandle_)->bIsAsync = true;
|
|
AuStaticCast<AFileHandle>(this->fsHandle_)->pIPCPipe = this;
|
|
|
|
this->fsBlockingStream_ = AuFS::OpenBlockingFileStreamFromHandleShared(this->fsHandle_);
|
|
if (!this->fsBlockingStream_)
|
|
{
|
|
this->bDead = true;
|
|
return;
|
|
}
|
|
|
|
this->fsStream_->Init(this->fsHandle_);
|
|
|
|
TryConnect();
|
|
}
|
|
|
|
void IPCPipeImpl::TryConnect()
|
|
{
|
|
if (this->serverHandle_ == INVALID_HANDLE_VALUE)
|
|
{
|
|
return;
|
|
}
|
|
|
|
this->overlapped.hEvent = GetConnectHandle();
|
|
|
|
if (AuExchange(bFirstTime, false) ||
|
|
(WaitForSingleObject(this->overlapped.hEvent, 0) == WAIT_OBJECT_0))
|
|
{
|
|
ResetEvent(this->overlapped.hEvent);
|
|
|
|
if (ConnectNamedPipe(this->serverHandle_, &this->overlapped))
|
|
{
|
|
this->bFirstTime = true;
|
|
TryConnect();
|
|
}
|
|
else
|
|
{
|
|
auto lastError = GetLastError();
|
|
|
|
if (lastError == ERROR_IO_PENDING)
|
|
{
|
|
// No-op
|
|
}
|
|
else if (lastError == ERROR_PIPE_CONNECTED)
|
|
{
|
|
SetEvent(this->overlapped.hEvent);
|
|
}
|
|
else if (lastError == ERROR_NO_DATA)
|
|
{
|
|
DisconnectNamedPipe(this->serverHandle_);
|
|
this->bFirstTime = true;
|
|
TryConnect();
|
|
}
|
|
else
|
|
{
|
|
SysPushErrorIO("{}", lastError);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
IPCPipeImpl::~IPCPipeImpl()
|
|
{
|
|
if (this->fsHandle_)
|
|
{
|
|
AuStaticCast<AFileHandle>(this->fsHandle_)->pIPCPipe = nullptr;
|
|
}
|
|
}
|
|
|
|
AuSPtr<Loop::ILoopSource> IPCPipeImpl::AsReadChannelIsOpen()
|
|
{
|
|
if (this->serverHandle_ == INVALID_HANDLE_VALUE)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
if (!this->lshasConnection_)
|
|
{
|
|
this->lshasConnection_ = AuMakeShared<IPCHasConnectionEvent>(AuSharedFromThis());
|
|
}
|
|
|
|
return this->lshasConnection_;
|
|
}
|
|
|
|
AuSPtr<Loop::ILoopSource> IPCPipeImpl::AsReadChannelHasData()
|
|
{
|
|
return this->SharedFromThis();
|
|
}
|
|
|
|
AuSPtr<IO::IAsyncTransaction> IPCPipeImpl::NewAsyncTransaction()
|
|
{
|
|
auto transaction = AuStaticCast<AuFS::NtAsyncFileTransaction>(this->fsStream_->NewTransaction());
|
|
if (transaction)
|
|
{
|
|
transaction->pNtIpcPipeImpl = AuSharedFromThis();
|
|
}
|
|
return transaction;
|
|
}
|
|
|
|
AuSPtr<IO::IIOHandle> IPCPipeImpl::GetCurrentSharedDuplexHandles()
|
|
{
|
|
return this->fsHandle_;
|
|
}
|
|
|
|
bool IPCPipeImpl::Read(const Memory::MemoryViewStreamWrite &write, bool nonblocking)
|
|
{
|
|
DWORD size = write.length;
|
|
|
|
write.outVariable = 0;
|
|
|
|
TryConnect();
|
|
|
|
auto h = this->GetPipeHandle();
|
|
if (h == INVALID_HANDLE_VALUE)
|
|
{
|
|
SysPushErrorUninitialized();
|
|
return false;
|
|
}
|
|
|
|
if (nonblocking || !write.ptr)
|
|
{
|
|
DWORD avail {};
|
|
if (!PeekNamedPipe(h, NULL, NULL, NULL, &avail, NULL))
|
|
{
|
|
return false;
|
|
}
|
|
|
|
if (!avail)
|
|
{
|
|
return true;
|
|
}
|
|
|
|
size = AuMin(size, avail);
|
|
}
|
|
|
|
if (!write.ptr)
|
|
{
|
|
write.outVariable = size;
|
|
return true;
|
|
}
|
|
|
|
OVERLAPPED a {};
|
|
a.hEvent = CreateEventA(NULL, true, 0, NULL);
|
|
|
|
if (!::ReadFile(h, write.ptr, size, NULL, &a) &&
|
|
::GetLastError() != ERROR_IO_PENDING)
|
|
{
|
|
::CloseHandle(a.hEvent);
|
|
return false;
|
|
}
|
|
|
|
::WaitForSingleObject(a.hEvent, 0);
|
|
if (!::GetOverlappedResult(h, &a, &size, true))
|
|
{
|
|
::CloseHandle(a.hEvent);
|
|
return false;
|
|
}
|
|
::CloseHandle(a.hEvent);
|
|
|
|
write.outVariable = size;
|
|
return true;
|
|
}
|
|
|
|
bool IPCPipeImpl::Write(const Memory::MemoryViewStreamRead &read)
|
|
{
|
|
read.outVariable = 0;
|
|
|
|
auto h = this->GetPipeHandle();
|
|
if (h == INVALID_HANDLE_VALUE)
|
|
{
|
|
SysPushErrorUninitialized();
|
|
return false;
|
|
}
|
|
|
|
TryConnect();
|
|
|
|
DWORD temp;
|
|
OVERLAPPED a {};
|
|
a.hEvent = CreateEventA(NULL, true, 0, NULL);
|
|
|
|
if (!::WriteFile(h, read.ptr, read.length, NULL, &a) &&
|
|
::GetLastError() != ERROR_IO_PENDING)
|
|
{
|
|
SysPushErrorIO("{}", GetLastError());
|
|
::CloseHandle(a.hEvent);
|
|
return false;
|
|
}
|
|
|
|
::WaitForSingleObject(a.hEvent, 0);
|
|
if (!::GetOverlappedResult(h, &a, &temp, true))
|
|
{
|
|
::CloseHandle(a.hEvent);
|
|
return false;
|
|
}
|
|
::CloseHandle(a.hEvent);
|
|
|
|
read.outVariable = temp;
|
|
return true;
|
|
}
|
|
|
|
HANDLE IPCPipeImpl::GetPipeHandle()
|
|
{
|
|
return this->clientHandle_ == INVALID_HANDLE_VALUE ? this->serverHandle_ : this->clientHandle_;
|
|
}
|
|
|
|
HANDLE IPCPipeImpl::GetConnectHandle()
|
|
{
|
|
return (HANDLE)AuStaticCast<Loop::LSEvent>(this->hasClient_)->GetHandle();
|
|
}
|
|
|
|
void IPCPipeImpl::OnEndOfReadStream()
|
|
{
|
|
// TODO: fire inverse LS
|
|
|
|
DisconnectNamedPipe(this->serverHandle_);
|
|
this->bFirstTime = true;
|
|
this->TryConnect();
|
|
}
|
|
|
|
bool IPCPipeImpl::IsSignaled()
|
|
{
|
|
DWORD avail {};
|
|
|
|
TryConnect();
|
|
|
|
if (!PeekNamedPipe(this->GetPipeHandle(), NULL, NULL, NULL, &avail, NULL))
|
|
{
|
|
return false;
|
|
}
|
|
|
|
return avail;
|
|
}
|
|
|
|
bool IPCPipeImpl::WaitOn(AuUInt32 timeout)
|
|
{
|
|
return LSHandle::WaitOn(timeout);
|
|
}
|
|
|
|
Loop::ELoopSource IPCPipeImpl::GetType()
|
|
{
|
|
return Loop::ELoopSource::eSourceIPCReadPipe;
|
|
}
|
|
|
|
AuString IPCPipeImpl::ExportToString()
|
|
{
|
|
TryConnect();
|
|
|
|
return this->clientHandle_ == INVALID_HANDLE_VALUE ?
|
|
this->ipcHandle_.ToString() :
|
|
AuString {};
|
|
}
|
|
|
|
AuSPtr<IO::FS::IFileStream> IPCPipeImpl::ToFileStream()
|
|
{
|
|
return this->fsBlockingStream_;
|
|
}
|
|
|
|
AuSPtr<IStreamReader> IPCPipeImpl::ToStreamReader()
|
|
{
|
|
return AuSPtr<IStreamReader>(this->SharedFromThis(), &this->pipeReader_);
|
|
}
|
|
|
|
AuSPtr<IStreamWriter> IPCPipeImpl::ToStreamWriter()
|
|
{
|
|
return AuSPtr<IStreamWriter>(this->SharedFromThis(), &this->pipeWriter_);
|
|
}
|
|
|
|
AUKN_SYM AuSPtr<IPCPipe> NewPipeEx(AuUInt32 uBytesLength)
|
|
{
|
|
IPC::IPCHandle handle;
|
|
IPC::IPCToken token;
|
|
token.NewId();
|
|
|
|
handle.PushId(EIPCHandleType::eIPCPipe, token);
|
|
|
|
auto path = token.ToNTPath();
|
|
|
|
#if defined(AURORA_PLATFORM_WIN32)
|
|
auto name = "\\\\.\\pipe\\" + token.ToNTPath();
|
|
#else
|
|
auto name = "\\\\.\\pipe\\LOCAL\\" + token.ToNTPath();
|
|
#endif
|
|
auto pageSize = AuHwInfo::GetPageSize();
|
|
auto maxLength = uBytesLength ? AuPageRound(uBytesLength, pageSize) : pageSize ? 16 * pageSize : 4096;
|
|
auto pipeServer = CreateNamedPipeA(name.c_str(),
|
|
PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_WRITE_THROUGH | FILE_FLAG_OVERLAPPED,
|
|
PIPE_WAIT,
|
|
1,
|
|
maxLength,
|
|
maxLength,
|
|
NMPWAIT_WAIT_FOREVER,
|
|
nullptr);
|
|
if ((!pipeServer) ||
|
|
(pipeServer == INVALID_HANDLE_VALUE))
|
|
{
|
|
SysPushErrorIO("{}", GetLastError());
|
|
return {};
|
|
}
|
|
|
|
auto object = AuMakeShared<IPCPipeImpl>(INVALID_HANDLE_VALUE, pipeServer, handle);
|
|
if (!object)
|
|
{
|
|
SysPushErrorMem();
|
|
AuWin32CloseHandle(pipeServer);
|
|
return {};
|
|
}
|
|
|
|
if (object->bDead)
|
|
{
|
|
SysPushErrorMemory();
|
|
SysPushErrorNested();
|
|
return {};
|
|
}
|
|
|
|
return object;
|
|
}
|
|
|
|
AUKN_SYM AuSPtr<IPCPipe> NewPipe()
|
|
{
|
|
return NewPipeEx(0);
|
|
}
|
|
|
|
AUKN_SYM AuSPtr<IPCPipe> ImportPipe(const AuString &handleString)
|
|
{
|
|
IPCHandle handle;
|
|
HANDLE pipe;
|
|
|
|
if (!handle.FromString(handleString))
|
|
{
|
|
SysPushErrorParseError();
|
|
return {};
|
|
}
|
|
|
|
auto token = handle.GetToken(EIPCHandleType::eIPCPipe, 0);
|
|
if (!token)
|
|
{
|
|
SysPushErrorParseError();
|
|
return {};
|
|
}
|
|
|
|
#if defined(AURORA_PLATFORM_WIN32)
|
|
auto name = "\\\\.\\pipe\\" + token->token.ToNTPath();
|
|
#else
|
|
auto name = "\\\\.\\pipe\\LOCAL\\" + token->token.ToNTPath();
|
|
#endif
|
|
pipe = Win32Open(AuLocale::ConvertFromUTF8(name).c_str(),
|
|
GENERIC_WRITE | GENERIC_READ,
|
|
0,
|
|
false,
|
|
OPEN_ALWAYS,
|
|
FILE_FLAG_OVERLAPPED,
|
|
FILE_ATTRIBUTE_NORMAL);
|
|
|
|
if ((!pipe) ||
|
|
(pipe == INVALID_HANDLE_VALUE))
|
|
{
|
|
if (GetLastError() == ERROR_PIPE_BUSY)
|
|
{
|
|
SysPushErrorIO("Pipe is used -> a client has already connected or the nt server is not ready");
|
|
return {};
|
|
}
|
|
SysPushErrorIO("{}", GetLastError());
|
|
return {};
|
|
}
|
|
|
|
auto object = AuMakeShared<IPCPipeImpl>(pipe, INVALID_HANDLE_VALUE, handle);
|
|
if (!object)
|
|
{
|
|
SysPushErrorMemory();
|
|
AuWin32CloseHandle(pipe);
|
|
return {};
|
|
}
|
|
|
|
if (object->bDead)
|
|
{
|
|
SysPushErrorMemory();
|
|
SysPushErrorNested();
|
|
return {};
|
|
}
|
|
|
|
return object;
|
|
}
|
|
}
|
|
|
|
// > The pipe created by UWP process with name \\.\pipe\Local\PipeName is converted to \\.\pipe\Sessions\<SessionId>\AppContainerNamedObjects\<AppContainerSid>\PipeName.
|
|
// > I can use this to communicate between UWP as server and Win32 as client.
|
|
// https://jike.in/qa/?qa=103904/
|
|
// ...good to know
|