630 lines
15 KiB
C++
630 lines
15 KiB
C++
/***
|
|
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
File: AuIPCPipe.Linux.cpp
|
|
Date: 2022-4-14
|
|
Author: Reece
|
|
***/
|
|
#include <Source/RuntimeInternal.hpp>
|
|
#include "IPC.hpp"
|
|
#include "AuIPCHandle.hpp"
|
|
#include "AuIPCPipe.Unix.hpp"
|
|
#include <Source/IO/UNIX/FDIpcServer.hpp>
|
|
#include <fcntl.h>
|
|
#include <sys/ioctl.h>
|
|
#include <termios.h>
|
|
#include <Source/IO/AuIOHandle.hpp>
|
|
|
|
namespace Aurora::IO::IPC
|
|
{
|
|
static const AuUInt64 kFileCopyBlock = 0x4000; // 16KiB
|
|
|
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
// Pipes
|
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
IPCPipeImpl::~IPCPipeImpl()
|
|
{
|
|
int fd {-1};
|
|
|
|
if (this->fsHandle_)
|
|
{
|
|
AuStaticCast<AFileHandle>(this->fsHandle_)->pIPCPipe = nullptr;
|
|
}
|
|
|
|
if (secondary[0] == -1)
|
|
{
|
|
this->mutex_->Unlock();
|
|
this->event_->Reset();
|
|
}
|
|
|
|
if ((fd = AuExchange(fds[0], -1)) != -1)
|
|
{
|
|
IO::UNIX::FDServeEnd(readEnd_);
|
|
//::close(fd);
|
|
}
|
|
|
|
if ((fd = AuExchange(fds[1], -1)) != -1)
|
|
{
|
|
IO::UNIX::FDServeEnd(writeEnd_);
|
|
//::close(fd);
|
|
}
|
|
|
|
if ((fd = AuExchange(secondary[0], -1)) != -1)
|
|
{
|
|
::close(fd);
|
|
}
|
|
|
|
if ((fd = AuExchange(secondary[1], -1)) != -1)
|
|
{
|
|
::close(fd);
|
|
}
|
|
|
|
this->fsHandle_.reset();
|
|
this->fsStream_.reset();
|
|
|
|
}
|
|
|
|
void IPCPipeImpl::DrainOtherFd()
|
|
{
|
|
#if defined(FIONREAD)
|
|
|
|
int available {};
|
|
if (::ioctl(this->secondary[0], FIONREAD, &available) < 0)
|
|
{
|
|
return;
|
|
}
|
|
|
|
if (!available)
|
|
{
|
|
return;
|
|
}
|
|
|
|
AuByteBuffer temp(available);
|
|
SysAssert(temp);
|
|
|
|
AuUInt uCounter {};
|
|
while (this->ReadEx({temp, uCounter}, true, false))
|
|
{
|
|
temp.writePtr = temp.readPtr = temp.base;
|
|
}
|
|
|
|
#else
|
|
|
|
AuByteBuffer temp(4096 * 10);
|
|
SysAssert(temp);
|
|
|
|
AuUInt uCounter {};
|
|
while (this->ReadEx({temp, uCounter}, true, false))
|
|
{
|
|
temp.writePtr = temp.readPtr = temp.base;
|
|
}
|
|
|
|
#endif
|
|
}
|
|
|
|
AuUInt IPCPipeImpl::GetPreemptFd()
|
|
{
|
|
return this->eventPreempt_.GetHandle();
|
|
}
|
|
|
|
bool IPCPipeImpl::LIOS_PopOne()
|
|
{
|
|
if (!this->bIsSendingZero)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
#if defined(FIONREAD)
|
|
int available {};
|
|
if (::ioctl(fds[0], FIONREAD, &available) < 0)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
if (available)
|
|
{
|
|
return false;
|
|
}
|
|
#endif
|
|
|
|
this->FinishFinalize();
|
|
return true;
|
|
}
|
|
|
|
void IPCPipeImpl::FinishFinalize()
|
|
{
|
|
this->bIsSendingZero = false;
|
|
this->bHasDied = true;
|
|
}
|
|
|
|
const AuList<AuUInt> &IPCPipeImpl::GetHandles()
|
|
{
|
|
return this->handles;
|
|
}
|
|
|
|
bool IPCPipeImpl::Singular()
|
|
{
|
|
return false;
|
|
}
|
|
|
|
bool IPCPipeImpl::OnTrigger(AuUInt handle)
|
|
{
|
|
return true;
|
|
}
|
|
|
|
IPCPipeImpl::IPCPipeImpl(int (fds2)[2], int (fds3)[2], IPCToken readEnd, IPCToken writeEnd, AuSPtr<IPCEvent> event, AuSPtr<IPCMutex> mutex) :
|
|
fds {fds2[0], fds2[1]}, secondary {fds3[0], fds3[1]},
|
|
readEnd_(readEnd), writeEnd_(writeEnd), event_(event), mutex_(mutex),
|
|
eventPreempt_(false, false, true),
|
|
pipeReader_(this),
|
|
pipeWriter_(this)
|
|
{
|
|
this->handles = {fds[0], eventPreempt_.GetHandle()};
|
|
|
|
#if defined(AURORA_IS_LINUX_DERIVED)
|
|
if (fds3[0] != -1)
|
|
{
|
|
AuStaticCast<IPCMutexProxy>(mutex)->pMutexClosedHook = this;
|
|
}
|
|
#endif
|
|
|
|
this->fsHandle_ = AuIO::IOHandleShared();
|
|
if (!this->fsHandle_)
|
|
{
|
|
this->bDead = true;
|
|
return;
|
|
}
|
|
|
|
this->fsStream_ = AuMakeShared<IO::FS::LinuxAsyncFileStream>();
|
|
if (!this->fsStream_)
|
|
{
|
|
this->bDead = true;
|
|
return;
|
|
}
|
|
|
|
AuStaticCast<AFileHandle>(this->fsHandle_)->pIPCPipe = this;
|
|
|
|
(void)this->fsHandle_->InitFromPairMove(fds2[0], fds2[1]);
|
|
this->fsStream_->Init(this->fsHandle_);
|
|
|
|
this->fsBlockingStream_ = AuFS::OpenBlockingFileStreamFromHandleShared(this->fsHandle_);
|
|
if (!this->fsBlockingStream_)
|
|
{
|
|
this->bDead = true;
|
|
return;
|
|
}
|
|
}
|
|
|
|
AuSPtr<Loop::ILoopSource> IPCPipeImpl::AsReadChannelIsOpen()
|
|
{
|
|
return this->event_;
|
|
}
|
|
|
|
AuSPtr<Loop::ILoopSource> IPCPipeImpl::AsReadChannelHasData()
|
|
{
|
|
return AuSharedFromThis();
|
|
}
|
|
|
|
AuSPtr<IO::IAsyncTransaction> IPCPipeImpl::NewAsyncTransaction()
|
|
{
|
|
return this->fsStream_->NewTransaction();
|
|
}
|
|
|
|
AuSPtr<IO::IIOHandle> IPCPipeImpl::GetCurrentSharedDuplexHandles()
|
|
{
|
|
return this->fsHandle_;
|
|
}
|
|
|
|
bool IPCPipeImpl::Read(const Memory::MemoryViewStreamWrite &write, bool nonblock)
|
|
{
|
|
return this->ReadEx(write, nonblock, true);
|
|
}
|
|
|
|
bool IPCPipeImpl::ReadEx(const Memory::MemoryViewStreamWrite &write, bool nonblock, bool bSide)
|
|
{
|
|
auto handle = bSide ? this->fds[0] : this->secondary[0];
|
|
|
|
auto control = ::fcntl(handle, F_GETFL);
|
|
auto ref = control;
|
|
|
|
if (control < 0)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
if (nonblock)
|
|
{
|
|
control |= O_NONBLOCK;
|
|
}
|
|
else
|
|
{
|
|
control &= ~O_NONBLOCK;
|
|
}
|
|
|
|
if (ref != control)
|
|
{
|
|
::fcntl(handle, F_SETFL, control);
|
|
}
|
|
|
|
if (!write.ptr)
|
|
{
|
|
#if defined(FIONREAD)
|
|
int available {};
|
|
if (::ioctl(handle, FIONREAD, &available) < 0)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
write.outVariable = available;
|
|
return true;
|
|
#else
|
|
SysPushErrorHAL("Cannot pull bytes available in the IPC on this UNIX platform");
|
|
return false;
|
|
#endif
|
|
}
|
|
|
|
int tmp;
|
|
do
|
|
{
|
|
tmp = ::read(handle, write.ptr, write.length);
|
|
}
|
|
while ((tmp == -1 && errno == EINTR));
|
|
|
|
if (tmp <= 0)
|
|
{
|
|
if (tmp == 0)
|
|
{
|
|
return nonblock;
|
|
}
|
|
|
|
SysPushErrorMem();
|
|
return false;
|
|
}
|
|
|
|
write.outVariable = tmp;
|
|
return true;
|
|
}
|
|
|
|
bool IPCPipeImpl::Write(const Memory::MemoryViewStreamRead &read)//, bool nonblock)
|
|
{
|
|
bool nonblock = true;
|
|
auto handle = this->fds[1];
|
|
auto control = ::fcntl(handle, F_GETFL);
|
|
auto ref = control;
|
|
|
|
if (nonblock)
|
|
{
|
|
control |= O_NONBLOCK;
|
|
}
|
|
else
|
|
{
|
|
control &= ~O_NONBLOCK;
|
|
}
|
|
|
|
if (ref != control)
|
|
{
|
|
::fcntl(handle, F_SETFL, control);
|
|
}
|
|
|
|
AuUInt length = read.length;
|
|
AuUInt offset {0};
|
|
|
|
while (length)
|
|
{
|
|
AuUInt32 written;
|
|
|
|
int blockSize = AuMin(AuUInt(kFileCopyBlock), length);
|
|
|
|
if (!PosixWrite(handle, &reinterpret_cast<const char *>(read.ptr)[offset], blockSize, &written))
|
|
{
|
|
if (!nonblock)
|
|
{
|
|
SysPushErrorNested("File Error: {}", this->fsHandle_->GetPath());
|
|
}
|
|
|
|
return nonblock;
|
|
}
|
|
|
|
offset += written;
|
|
length -= written;
|
|
}
|
|
|
|
if (!offset)
|
|
{
|
|
return nonblock;
|
|
}
|
|
|
|
read.outVariable = offset;
|
|
return true;
|
|
}
|
|
|
|
bool IPCPipeImpl::IsSignaled()
|
|
{
|
|
return LSHandle::IsSignaled();
|
|
}
|
|
|
|
bool IPCPipeImpl::WaitOn(AuUInt32 timeout)
|
|
{
|
|
return LSHandle::WaitOn(timeout);
|
|
}
|
|
|
|
Loop::ELoopSource IPCPipeImpl::GetType()
|
|
{
|
|
return Loop::ELoopSource::eSourceIPCReadPipe;
|
|
}
|
|
|
|
#if defined(AURORA_IS_LINUX_DERIVED)
|
|
bool IPCPipeImpl::OnClosed()
|
|
{
|
|
if (this->bHasDied)
|
|
{
|
|
this->DrainOtherFd();
|
|
this->bHasDied = false;
|
|
this->eventPreempt_.Reset();
|
|
this->event_->Reset();
|
|
return true;
|
|
}
|
|
|
|
this->SendTerminateSignal();
|
|
return false;
|
|
}
|
|
#endif
|
|
|
|
void IPCPipeImpl::SendTerminateSignal()
|
|
{
|
|
this->bIsSendingZero = true;
|
|
this->eventPreempt_.Set();
|
|
this->event_->Reset();
|
|
}
|
|
|
|
AuString IPCPipeImpl::ExportToString()
|
|
{
|
|
IPC::IPCHandle handle;
|
|
|
|
handle.PushId(EIPCHandleType::eIPCPipe, AuStaticCast<IPCEventProxy>(this->event_)->handle_.values[0].token);
|
|
#if defined(AURORA_IS_LINUX_DERIVED)
|
|
auto that = AuStaticCast<IPCMutexProxy>(this->mutex_);
|
|
handle.PushId(EIPCHandleType::eIPCPrimitiveMutex, that->token_);
|
|
SysAssert(that->mem_);
|
|
handle.PushId(EIPCHandleType::eIPCMemory, AuStaticCast<IPCSharedMemoryImpl>(that->mem_)->handle_.values[0].token);
|
|
handle.values[1].token.word = that->index_;
|
|
#else
|
|
handle.PushId(EIPCHandleType::eIPCPrimitiveMutex, ->handle_.values[0].token);
|
|
#endif
|
|
handle.PushId(EIPCHandleType::eIPCPipeEnd, this->readEnd_);
|
|
handle.PushId(EIPCHandleType::eIPCPipeEnd, this->writeEnd_);
|
|
|
|
return handle.ToString();
|
|
}
|
|
|
|
AuSPtr<IO::FS::IFileStream> IPCPipeImpl::ToFileStream()
|
|
{
|
|
return this->fsBlockingStream_;
|
|
}
|
|
|
|
AuSPtr<IStreamReader> IPCPipeImpl::ToStreamReader()
|
|
{
|
|
return AuSPtr<IStreamReader>(this->SharedFromThis(), &this->pipeReader_);
|
|
}
|
|
|
|
AuSPtr<IStreamWriter> IPCPipeImpl::ToStreamWriter()
|
|
{
|
|
return AuSPtr<IStreamWriter>(this->SharedFromThis(), &this->pipeWriter_);
|
|
}
|
|
|
|
AUKN_SYM AuSPtr<IPCPipe> NewPipeEx(AuUInt32 uPipeLength)
|
|
{
|
|
IPCToken readEnd, writeEnd;
|
|
int fds1[2];
|
|
int fds2[2];
|
|
|
|
auto event = NewEvent(false, false);
|
|
if (!event)
|
|
{
|
|
SysPushErrorNested();
|
|
return {};
|
|
}
|
|
|
|
auto mutex = NewMutex();
|
|
if (!mutex)
|
|
{
|
|
SysPushErrorNested();
|
|
return {};
|
|
}
|
|
|
|
if (::pipe(fds1) == -1)
|
|
{
|
|
SysPushErrorIO("no pipe");
|
|
return {};
|
|
}
|
|
|
|
if (::pipe(fds2) == -1)
|
|
{
|
|
SysPushErrorIO("no pipe");
|
|
::close(fds1[0]);
|
|
::close(fds1[1]);
|
|
return {};
|
|
}
|
|
|
|
#if defined(F_SETPIPE_SZ)
|
|
if (uPipeLength)
|
|
{
|
|
uPipeLength = AuPageRound(uPipeLength, AuHwInfo::GetPageSize());
|
|
|
|
if (fcntl(fds1[1], F_SETPIPE_SZ, uPipeLength) == -1 ||
|
|
fcntl(fds2[1], F_SETPIPE_SZ, uPipeLength) == -1)
|
|
{
|
|
SysPushErrorIO("couldnt expand pipe");
|
|
::close(fds1[0]);
|
|
::close(fds1[1]);
|
|
::close(fds2[0]);
|
|
::close(fds2[1]);
|
|
return {};
|
|
}
|
|
}
|
|
#endif
|
|
|
|
if (!IO::UNIX::FDServe(fds1[0], readEnd))
|
|
{
|
|
SysPushErrorIO();
|
|
::close(fds1[0]);
|
|
::close(fds1[1]);
|
|
::close(fds2[0]);
|
|
::close(fds2[1]);
|
|
return {};
|
|
}
|
|
|
|
if (!IO::UNIX::FDServe(fds2[1], writeEnd))
|
|
{
|
|
SysPushErrorIO();
|
|
IO::UNIX::FDServeEnd(readEnd);
|
|
::close(fds1[0]);
|
|
::close(fds1[1]);
|
|
::close(fds2[0]);
|
|
::close(fds2[1]);
|
|
return {};
|
|
}
|
|
|
|
int serverPair[2] {fds2[0], fds1[1]};
|
|
int clientPair[2] {fds1[0], fds2[1]};
|
|
|
|
auto handle = AuMakeShared<IPCPipeImpl>(serverPair, clientPair, readEnd, writeEnd, event, mutex);
|
|
if (!handle)
|
|
{
|
|
SysPushErrorMem();
|
|
IO::UNIX::FDServeEnd(readEnd);
|
|
IO::UNIX::FDServeEnd(writeEnd);
|
|
::close(fds1[0]);
|
|
::close(fds1[1]);
|
|
::close(fds2[0]);
|
|
::close(fds2[1]);
|
|
return {};
|
|
}
|
|
|
|
if (handle->bDead)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
return handle;
|
|
}
|
|
|
|
AUKN_SYM AuSPtr<IPCPipe> NewPipe()
|
|
{
|
|
return NewPipeEx(0);
|
|
}
|
|
|
|
AUKN_SYM AuSPtr<IPCPipe> ImportPipe(const AuString &handleString)
|
|
{
|
|
int fds[2] {-1, -1};
|
|
|
|
IPCHandle handle;
|
|
|
|
if (!handle.FromString(handleString))
|
|
{
|
|
SysPushErrorParseError();
|
|
return {};
|
|
}
|
|
|
|
auto eventVal = handle.GetToken(IPC::EIPCHandleType::eIPCPipe, 0);
|
|
if (!eventVal)
|
|
{
|
|
SysPushErrorParseError();
|
|
return {};
|
|
}
|
|
|
|
auto mutexVal = handle.GetToken(IPC::EIPCHandleType::eIPCPrimitiveMutex, 1);
|
|
if (!mutexVal)
|
|
{
|
|
SysPushErrorParseError();
|
|
return {};
|
|
}
|
|
|
|
int offset {};
|
|
|
|
#if defined(AURORA_IS_LINUX_DERIVED)
|
|
offset = 1;
|
|
|
|
auto ipcMem = handle.GetToken(IPC::EIPCHandleType::eIPCMemory, 2);
|
|
if (!ipcMem)
|
|
{
|
|
SysPushErrorParseError();
|
|
return {};
|
|
}
|
|
#endif
|
|
|
|
auto readVal = handle.GetToken(IPC::EIPCHandleType::eIPCPipeEnd, 2 + offset);
|
|
if (!readVal)
|
|
{
|
|
SysPushErrorParseError();
|
|
return {};
|
|
}
|
|
|
|
auto writeVal = handle.GetToken(IPC::EIPCHandleType::eIPCPipeEnd, 3 + offset);
|
|
if (!writeVal)
|
|
{
|
|
SysPushErrorParseError();
|
|
return {};
|
|
}
|
|
|
|
auto event = ImportEventEx(eventVal->token);
|
|
if (!event)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
auto mutex = ImportMutexEx(mutexVal->token, ipcMem->token, mutexVal->token.word);
|
|
if (!mutex)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
if (!IO::UNIX::FDAccept(readVal->token, fds[0]))
|
|
{
|
|
SysPushErrorNested();
|
|
return {};
|
|
}
|
|
|
|
if (!IO::UNIX::FDAccept(writeVal->token, fds[1]))
|
|
{
|
|
::close(fds[0]);
|
|
SysPushErrorNested();
|
|
return {};
|
|
}
|
|
|
|
int dummy[2] {-1, -1};
|
|
auto object = AuMakeShared<IPCPipeImpl>(fds,
|
|
dummy,
|
|
readVal->token,
|
|
writeVal->token,
|
|
event,
|
|
mutex);
|
|
if (!object)
|
|
{
|
|
SysPushErrorMem();
|
|
::close(fds[0]);
|
|
::close(fds[1]);
|
|
return {};
|
|
}
|
|
|
|
if (object->bDead)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
mutex->WaitOn(0);
|
|
|
|
if (event->IsSignaled())
|
|
{
|
|
mutex->Unlock();
|
|
SysPushErrorIO("Pipe Busy");
|
|
return {};
|
|
}
|
|
|
|
event->Set();
|
|
|
|
|
|
return object;
|
|
}
|
|
} |