AuroraRuntime/Source/IO/IPC/IPCPipe.Unix.cpp
J Reece Wilson 64cb7404ba [+] Near 1:1 Linux IPC Pipe compared to the NT equivalent (~= CreateNamedPipeA(nMaxInstances=1, dwOpenMode=PIPE_ACCESS_DUPLEX, dwPipeMode=PIPE_TYPE_BYTE))
[+] Ability to bypass blocking limitation of certain io_submit reads, if the blocking subsystem is a pollable stream (ie: a pipe).
[*] Fixed major Linux bug where LoopQueue items weren't being submitted, if no dequeues were in the same tick
[*] Fix various Linux pipe related bugs
[*] Fix futex bug where the callback was nulled on server-release
2022-08-09 07:48:29 +01:00

499 lines
12 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IPCPipe.Linux.cpp
Date: 2022-4-14
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "IPC.hpp"
#include "IPCHandle.hpp"
#include "IPCPipe.Unix.hpp"
#include <Source/IO/UNIX/FDIpcServer.hpp>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <termios.h>
namespace Aurora::IO::IPC
{
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Pipes
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
IPCPipeImpl::~IPCPipeImpl()
{
int fd {-1};
if (secondary[0] == -1)
{
this->mutex_->Unlock();
this->event_->Reset();
}
if ((fd = AuExchange(fds[0], -1)) != -1)
{
IO::UNIX::FDServeEnd(readEnd_);
//::close(fd);
}
if ((fd = AuExchange(fds[1], -1)) != -1)
{
IO::UNIX::FDServeEnd(writeEnd_);
//::close(fd);
}
if ((fd = AuExchange(secondary[0], -1)) != -1)
{
::close(fd);
}
if ((fd = AuExchange(secondary[1], -1)) != -1)
{
::close(fd);
}
fsHandle_.reset();
fsStream_.reset();
}
void IPCPipeImpl::DrainOtherFd()
{
// TODO (Reece): urgent
}
AuUInt IPCPipeImpl::GetPreemptFd()
{
return this->eventPreempt_.GetHandle();
}
bool IPCPipeImpl::LIOS_PopOne()
{
if (!this->bIsSendingZero)
{
return false;
}
#if defined(FIONREAD)
int available {};
int res = ::ioctl(fds[0], FIONREAD, &available) ;
if (res< 0)
{
return false;
}
if (available)
{
return false;
}
#endif
this->FinishFinalize();
return true;
}
void IPCPipeImpl::FinishFinalize()
{
this->bIsSendingZero = false;
this->bHasDied = true;
}
const AuList<AuUInt> &IPCPipeImpl::GetHandles()
{
return this->handles;
}
bool IPCPipeImpl::Singular()
{
return false;
}
bool IPCPipeImpl::OnTrigger(AuUInt handle)
{
return true;
}
IPCPipeImpl::IPCPipeImpl(int (fds2)[2], int (fds3)[2], IPCToken readEnd, IPCToken writeEnd, AuSPtr<IPCEvent> event, AuSPtr<IPCMutex> mutex) :
fds {fds2[0], fds2[1]}, secondary {fds3[0], fds3[1]},
readEnd_(readEnd), writeEnd_(writeEnd), event_(event), mutex_(mutex),
eventPreempt_(false, false, true)
{
this->handles = {fds[0], eventPreempt_.GetHandle()};
#if defined(AURORA_IS_LINUX_DERIVED)
if (fds3[0] != -1)
{
AuStaticCast<IPCMutexProxy>(mutex)->pMutexClosedHook = this;
}
#endif
this->fsHandle_ = AuMakeShared<IO::FS::FileHandle>();
this->fsStream_ = AuMakeShared<IO::FS::LinuxAsyncFileStream>();
this->fsHandle_->pIPCPipe = this;
this->fsHandle_->Init(fds2[0], fds2[1]);
this->fsStream_->Init(this->fsHandle_);
}
AuSPtr<Loop::ILoopSource> IPCPipeImpl::AsReadChannelIsOpen()
{
return this->event_;
}
AuSPtr<Loop::ILoopSource> IPCPipeImpl::AsReadChannelHasData()
{
return AuSharedFromThis();
}
AuSPtr<IO::IAsyncTransaction> IPCPipeImpl::NewAsyncTransaction()
{
return this->fsStream_->NewTransaction();
}
bool IPCPipeImpl::Read(const Memory::MemoryViewStreamWrite &write, bool nonblock)
{
auto handle = fds[0];
auto control = ::fcntl(handle, F_GETFL);
auto ref = control;
if (nonblock)
{
control |= O_NONBLOCK;
}
else
{
control &= ~O_NONBLOCK;
}
if (ref != control)
{
::fcntl(handle, F_SETFL, control);
}
if (!write.ptr)
{
#if defined(FIONREAD)
int available {};
if (::ioctl(fds[0], FIONREAD, &available) < 0)
{
return false;
}
write.outVariable = available;
return true;
#else
SysPushErrorHAL("Cannot pull bytes available in the IPC on this UNIX platform");
return false;
#endif
}
int tmp;
do
{
tmp = ::read(handle, write.ptr, write.length);
}
while ((tmp == -1 && errno == EINTR));
if (tmp <= 0)
{
if (tmp == 0)
{
return nonblock;
}
SysPushErrorMem();
return false;
}
write.outVariable = tmp;
return true;
}
bool IPCPipeImpl::Write(const Memory::MemoryViewStreamRead &read)//, bool nonblock)
{
bool nonblock = true;
auto handle = this->fds[1];
auto control = ::fcntl(handle, F_GETFL);
auto ref = control;
if (nonblock)
{
control |= O_NONBLOCK;
}
else
{
control &= ~O_NONBLOCK;
}
if (ref != control)
{
::fcntl(handle, F_SETFL, control);
}
int tmp;
do
{
tmp = ::write(handle, read.ptr, read.length);
}
while ((tmp == -1 && errno == EINTR));
if (tmp <= 0)
{
if (tmp == 0)
{
return nonblock;
}
SysPushErrorMem();
return false;
}
read.outVariable = tmp;
return true;
}
bool IPCPipeImpl::IsSignaled()
{
return LSHandle::IsSignaled();
}
bool IPCPipeImpl::WaitOn(AuUInt32 timeout)
{
return LSHandle::WaitOn(timeout);
}
Loop::ELoopSource IPCPipeImpl::GetType()
{
return Loop::ELoopSource::eSourceIPCReadPipe;
}
#if defined(AURORA_IS_LINUX_DERIVED)
bool IPCPipeImpl::OnClosed()
{
if (this->bHasDied)
{
this->DrainOtherFd();
this->bHasDied = false;
this->eventPreempt_.Reset();
this->event_->Reset();
return true;
}
this->SendTerminateSignal();
return false;
}
#endif
void IPCPipeImpl::SendTerminateSignal()
{
this->bIsSendingZero = true;
this->eventPreempt_.Set();
this->event_->Reset();
}
AuString IPCPipeImpl::ExportToString()
{
IPC::IPCHandle handle;
handle.PushId(EIPCHandleType::eIPCPipe, AuStaticCast<IPCEventProxy>(this->event_)->handle_.values[0].token);
#if defined(AURORA_IS_LINUX_DERIVED)
auto that = AuStaticCast<IPCMutexProxy>(this->mutex_);
handle.PushId(EIPCHandleType::eIPCPrimitiveMutex, that->token_);
SysAssert(that->mem_);
handle.PushId(EIPCHandleType::eIPCMemory, AuStaticCast<IPCSharedMemoryImpl>(that->mem_)->handle_.values[0].token);
handle.values[1].token.word = that->index_;
#else
handle.PushId(EIPCHandleType::eIPCPrimitiveMutex, ->handle_.values[0].token);
#endif
handle.PushId(EIPCHandleType::eIPCPipeEnd, this->readEnd_);
handle.PushId(EIPCHandleType::eIPCPipeEnd, this->writeEnd_);
return handle.ToString();
}
AUKN_SYM AuSPtr<IPCPipe> NewPipe()
{
IPCToken readEnd, writeEnd;
int fds1[2];
int fds2[2];
auto event = NewEvent(false, false);
if (!event)
{
SysPushErrorNested();
return {};
}
auto mutex = NewMutex();
if (!mutex)
{
SysPushErrorNested();
return {};
}
if (::pipe(fds1) == -1)
{
SysPushErrorIO();
return {};
}
if (::pipe(fds2) == -1)
{
SysPushErrorIO();
::close(fds1[0]);
::close(fds1[1]);
return {};
}
if (!IO::UNIX::FDServe(fds1[0], readEnd))
{
SysPushErrorIO();
::close(fds1[0]);
::close(fds1[1]);
::close(fds2[0]);
::close(fds2[1]);
return {};
}
if (!IO::UNIX::FDServe(fds2[1], writeEnd))
{
SysPushErrorIO();
IO::UNIX::FDServeEnd(readEnd);
::close(fds1[0]);
::close(fds1[1]);
::close(fds2[0]);
::close(fds2[1]);
return {};
}
int serverPair[2] {fds2[0], fds1[1]};
int clientPair[2] {fds1[0], fds2[1]};
auto handle = AuMakeShared<IPCPipeImpl>(serverPair, clientPair, readEnd, writeEnd, event, mutex);
if (!handle)
{
SysPushErrorMem();
IO::UNIX::FDServeEnd(readEnd);
IO::UNIX::FDServeEnd(writeEnd);
::close(fds1[0]);
::close(fds1[1]);
::close(fds2[0]);
::close(fds2[1]);
return {};
}
return handle;
}
AUKN_SYM AuSPtr<IPCPipe> ImportPipe(const AuString &handleString)
{
int fds[2] {-1, -1};
IPCHandle handle;
if (!handle.FromString(handleString))
{
SysPushErrorParseError();
return {};
}
auto eventVal = handle.GetToken(IPC::EIPCHandleType::eIPCPipe, 0);
if (!eventVal)
{
SysPushErrorParseError();
return {};
}
auto mutexVal = handle.GetToken(IPC::EIPCHandleType::eIPCPrimitiveMutex, 1);
if (!mutexVal)
{
SysPushErrorParseError();
return {};
}
int offset {};
#if defined(AURORA_IS_LINUX_DERIVED)
offset = 1;
auto ipcMem = handle.GetToken(IPC::EIPCHandleType::eIPCMemory, 2);
if (!ipcMem)
{
SysPushErrorParseError();
return {};
}
#endif
auto readVal = handle.GetToken(IPC::EIPCHandleType::eIPCPipeEnd, 2 + offset);
if (!readVal)
{
SysPushErrorParseError();
return {};
}
auto writeVal = handle.GetToken(IPC::EIPCHandleType::eIPCPipeEnd, 3 + offset);
if (!writeVal)
{
SysPushErrorParseError();
return {};
}
auto event = ImportEventEx(eventVal->token);
if (!event)
{
return {};
}
auto mutex = ImportMutexEx(mutexVal->token, ipcMem->token, mutexVal->token.word);
if (!mutex)
{
return {};
}
if (!IO::UNIX::FDAccept(readVal->token, fds[0]))
{
SysPushErrorNested();
return {};
}
if (!IO::UNIX::FDAccept(writeVal->token, fds[1]))
{
::close(fds[0]);
SysPushErrorNested();
return {};
}
int dummy[2] {-1, -1};
auto object = AuMakeShared<IPCPipeImpl>(fds,
dummy,
readVal->token,
writeVal->token,
event,
mutex);
if (!object)
{
SysPushErrorMem();
::close(fds[0]);
::close(fds[1]);
return {};
}
mutex->WaitOn(0);
if (event->IsSignaled())
{
mutex->Unlock();
SysPushErrorIO("Pipe Busy");
::close(fds[0]);
::close(fds[1]);
return {};
}
event->Set();
return object;
}
}