AuroraRuntime/Source/IO/IPC/AuIPCPipe.NT.cpp
2023-10-21 10:43:28 +01:00

457 lines
12 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuIPCPipe.NT.cpp
Date: 2022-4-15
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "IPC.hpp"
#include "AuIPCHandle.hpp"
#include <Source/IO/Loop/ILoopSourceEx.hpp>
#include <Source/IO/Loop/LSHandle.hpp>
#include <Source/IO/Loop/LSEvent.hpp>
#include <Source/IO/FS/Async.NT.hpp>
#include <Source/IO/AuIOHandle.hpp>
#include "AuIPCPipe.NT.hpp"
namespace Aurora::IO::IPC
{
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Pipes
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
struct IPCPipeImpl;
struct IPCHasConnectionEvent : Loop::LSHandle
{
IPCHasConnectionEvent(AuSPtr<IPCPipeImpl> parent);
bool IsSignaled() override;
Loop::ELoopSource GetType() override;
bool OnTrigger(AuUInt handle) override;
private:
AuWPtr<IPCPipeImpl> parent_;
};
IPCHasConnectionEvent::IPCHasConnectionEvent(AuSPtr<IPCPipeImpl> parent) : parent_(parent), LSHandle((AuUInt)parent->GetConnectHandle())
{
}
bool IPCHasConnectionEvent::IsSignaled()
{
return OnTrigger(0);
}
Loop::ELoopSource IPCHasConnectionEvent::GetType()
{
return Loop::ELoopSource::eSourceIPCHasClient;
}
bool IPCHasConnectionEvent::OnTrigger(AuUInt handle)
{
auto parent = this->parent_.lock();
if (!parent)
{
SysPushErrorMem("IPC pipe is dead");
return false;
}
parent->TryConnect();
if (WaitForSingleObject(parent->overlapped.hEvent, 0) != WAIT_OBJECT_0)
{
return false;
}
if (parent->clientHandle_ != INVALID_HANDLE_VALUE)
{
return true;
}
DWORD avail {};
auto h = parent->GetPipeHandle();
if (!PeekNamedPipe(h, NULL, NULL, NULL, &avail, NULL))
{
return false;
}
return true;
}
IPCPipeImpl::IPCPipeImpl(HANDLE clientHandle, HANDLE serverHandle, const IPCHandle &handle) :
serverHandle_(serverHandle),
clientHandle_(clientHandle),
ipcHandle_(handle),
pipeReader_(this),
pipeWriter_(this)
{
if (serverHandle != INVALID_HANDLE_VALUE)
{
this->hasClient_ = Loop::NewLSEvent(false, false, true);
SysAssert(this->hasClient_);
}
this->fsHandle_ = AuIO::IOHandleShared();
SysAssert(this->fsHandle_);
this->fsStream_ = AuMakeSharedPanic<IO::FS::NtAsyncFileStream>();
this->fsHandle_->InitFromMove((AuUInt)this->GetPipeHandle());
AuStaticCast<AFileHandle>(this->fsHandle_)->bIsAsync = true;
AuStaticCast<AFileHandle>(this->fsHandle_)->pIPCPipe = this;
this->fsBlockingStream_ = AuFS::OpenBlockingFileStreamFromHandle(this->fsHandle_);
SysAssert(this->fsBlockingStream_);
this->fsStream_->Init(this->fsHandle_);
TryConnect();
}
void IPCPipeImpl::TryConnect()
{
if (this->serverHandle_ == INVALID_HANDLE_VALUE)
{
return;
}
this->overlapped.hEvent = GetConnectHandle();
if (AuExchange(bFirstTime, false) ||
(WaitForSingleObject(this->overlapped.hEvent, 0) == WAIT_OBJECT_0))
{
ResetEvent(this->overlapped.hEvent);
if (ConnectNamedPipe(this->serverHandle_, &this->overlapped))
{
this->bFirstTime = true;
TryConnect();
}
else
{
auto lastError = GetLastError();
if (lastError == ERROR_IO_PENDING)
{
// No-op
}
else if (lastError == ERROR_PIPE_CONNECTED)
{
SetEvent(this->overlapped.hEvent);
}
else if (lastError == ERROR_NO_DATA)
{
DisconnectNamedPipe(this->serverHandle_);
this->bFirstTime = true;
TryConnect();
}
else
{
SysPushErrorIO("{}", lastError);
}
}
}
}
IPCPipeImpl::~IPCPipeImpl()
{
}
AuSPtr<Loop::ILoopSource> IPCPipeImpl::AsReadChannelIsOpen()
{
if (this->serverHandle_ == INVALID_HANDLE_VALUE)
{
return {};
}
if (!this->lshasConnection_)
{
this->lshasConnection_ = AuMakeShared<IPCHasConnectionEvent>(AuSharedFromThis());
}
return this->lshasConnection_;
}
AuSPtr<Loop::ILoopSource> IPCPipeImpl::AsReadChannelHasData()
{
return this->SharedFromThis();
}
AuSPtr<IO::IAsyncTransaction> IPCPipeImpl::NewAsyncTransaction()
{
auto transaction = AuStaticCast<AuFS::NtAsyncFileTransaction>(this->fsStream_->NewTransaction());
if (transaction)
{
transaction->pNtIpcPipeImpl = AuSharedFromThis();
}
return transaction;
}
AuSPtr<IO::IIOHandle> IPCPipeImpl::GetCurrentSharedDuplexHandles()
{
return this->fsHandle_;
}
bool IPCPipeImpl::Read(const Memory::MemoryViewStreamWrite &write, bool nonblocking)
{
DWORD size = write.length;
TryConnect();
auto h = this->GetPipeHandle();
if (h == INVALID_HANDLE_VALUE)
{
SysPushErrorUninitialized();
return false;
}
if (nonblocking || !write.ptr)
{
DWORD avail {};
if (!PeekNamedPipe(h, NULL, NULL, NULL, &avail, NULL))
{
return false;
}
if (!avail)
{
return true;
}
size = AuMin(size, avail);
}
if (!write.ptr)
{
write.outVariable = size;
return true;
}
OVERLAPPED a {};
a.hEvent = CreateEventA(NULL, true, 0, NULL);
if (!::ReadFile(h, write.ptr, size, NULL, &a) &&
::GetLastError() != ERROR_IO_PENDING)
{
::CloseHandle(a.hEvent);
return false;
}
::WaitForSingleObject(a.hEvent, 0);
if (!::GetOverlappedResult(h, &a, &size, true))
{
::CloseHandle(a.hEvent);
return false;
}
::CloseHandle(a.hEvent);
write.outVariable = size;
return true;
}
bool IPCPipeImpl::Write(const Memory::MemoryViewStreamRead &read)
{
auto h = this->GetPipeHandle();
if (h == INVALID_HANDLE_VALUE)
{
SysPushErrorUninitialized();
return false;
}
TryConnect();
DWORD temp;
OVERLAPPED a {};
a.hEvent = CreateEventA(NULL, true, 0, NULL);
if (!::WriteFile(h, read.ptr, read.length, NULL, &a) &&
::GetLastError() != ERROR_IO_PENDING)
{
SysPushErrorIO("{}", GetLastError());
::CloseHandle(a.hEvent);
return false;
}
::WaitForSingleObject(a.hEvent, 0);
if (!::GetOverlappedResult(h, &a, &temp, true))
{
::CloseHandle(a.hEvent);
return false;
}
::CloseHandle(a.hEvent);
read.outVariable = temp;
return true;
}
HANDLE IPCPipeImpl::GetPipeHandle()
{
return this->clientHandle_ == INVALID_HANDLE_VALUE ? this->serverHandle_ : this->clientHandle_;
}
HANDLE IPCPipeImpl::GetConnectHandle()
{
return (HANDLE)AuStaticCast<Loop::LSEvent>(this->hasClient_)->GetHandle();
}
void IPCPipeImpl::OnEndOfReadStream()
{
// TODO: fire inverse LS
DisconnectNamedPipe(this->serverHandle_);
this->bFirstTime = true;
this->TryConnect();
}
bool IPCPipeImpl::IsSignaled()
{
DWORD avail {};
TryConnect();
if (!PeekNamedPipe(this->GetPipeHandle(), NULL, NULL, NULL, &avail, NULL))
{
return false;
}
return avail;
}
bool IPCPipeImpl::WaitOn(AuUInt32 timeout)
{
return LSHandle::WaitOn(timeout);
}
Loop::ELoopSource IPCPipeImpl::GetType()
{
return Loop::ELoopSource::eSourceIPCReadPipe;
}
AuString IPCPipeImpl::ExportToString()
{
TryConnect();
return this->clientHandle_ == INVALID_HANDLE_VALUE ?
this->ipcHandle_.ToString() :
AuString {};
}
AuSPtr<IO::FS::IFileStream> IPCPipeImpl::ToFileStream()
{
return this->fsBlockingStream_;
}
AuSPtr<IStreamReader> IPCPipeImpl::ToStreamReader()
{
return AuSPtr<IStreamReader>(this->SharedFromThis(), &this->pipeReader_);
}
AuSPtr<IStreamWriter> IPCPipeImpl::ToStreamWriter()
{
return AuSPtr<IStreamWriter>(this->SharedFromThis(), &this->pipeWriter_);
}
AUKN_SYM AuSPtr<IPCPipe> NewPipeEx(AuUInt32 uBytesLength)
{
IPC::IPCHandle handle;
IPC::IPCToken token;
token.NewId();
handle.PushId(EIPCHandleType::eIPCPipe, token);
auto path = token.ToNTPath();
auto name = "\\\\.\\pipe\\" + token.ToNTPath();
auto pageSize = AuHwInfo::GetPageSize();
auto maxLength = uBytesLength ? AuPageRound(uBytesLength, pageSize) : pageSize ? 16 * pageSize : 4096;
auto pipeServer = CreateNamedPipeA(name.c_str(),
PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_WRITE_THROUGH | FILE_FLAG_OVERLAPPED,
PIPE_WAIT,
1,
maxLength,
maxLength,
NMPWAIT_WAIT_FOREVER,
nullptr);
if ((!pipeServer) ||
(pipeServer == INVALID_HANDLE_VALUE))
{
SysPushErrorIO("{}", GetLastError());
return {};
}
auto object = AuMakeShared<IPCPipeImpl>(INVALID_HANDLE_VALUE, pipeServer, handle);
if (!object)
{
SysPushErrorMem();
AuWin32CloseHandle(pipeServer);
return {};
}
return object;
}
AUKN_SYM AuSPtr<IPCPipe> NewPipe()
{
return NewPipeEx(0);
}
AUKN_SYM AuSPtr<IPCPipe> ImportPipe(const AuString &handleString)
{
IPCHandle handle;
HANDLE pipe;
if (!handle.FromString(handleString))
{
SysPushErrorParseError();
return {};
}
auto token = handle.GetToken(EIPCHandleType::eIPCPipe, 0);
if (!token)
{
SysPushErrorParseError();
return {};
}
auto name = "\\\\.\\pipe\\" + token->token.ToNTPath();
pipe = CreateFileA(name.c_str(),
GENERIC_WRITE | GENERIC_READ,
0,
NULL,
OPEN_ALWAYS,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
NULL);
if ((!pipe) ||
(pipe == INVALID_HANDLE_VALUE))
{
if (GetLastError() == ERROR_PIPE_BUSY)
{
SysPushErrorIO("Pipe is used -> a client has already connected or the nt server is not ready");
return {};
}
SysPushErrorIO("{}", GetLastError());
return {};
}
auto object = AuMakeShared<IPCPipeImpl>(pipe, INVALID_HANDLE_VALUE, handle);
if (!object)
{
SysPushErrorMem();
AuWin32CloseHandle(pipe);
return {};
}
return object;
}
}
// > The pipe created by UWP process with name \\.\pipe\Local\PipeName is converted to \\.\pipe\Sessions\<SessionId>\AppContainerNamedObjects\<AppContainerSid>\PipeName.
// > I can use this to communicate between UWP as server and Win32 as client.
// https://jike.in/qa/?qa=103904/
// ...good to know