/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: IPCPipe.Linux.cpp Date: 2022-4-14 Author: Reece ***/ #include #include "IPC.hpp" #include "IPCHandle.hpp" #include "IPCPipe.Unix.hpp" #include #include #include #include #if defined(AURORA_IS_LINUX_DERIVED) #include #include #include "IPCPrimitives.Linux.hpp" #include "IPCMutexFutex.Linux.hpp" #endif #include "IPCMemory.Unix.hpp" // required for handle #include namespace Aurora::IO::IPC { ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Pipes ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// struct IPCPipeImpl : IPCPipe, Loop::LSHandle, AuEnableSharedFromThis { IPCPipeImpl(int (fds)[2], int (secondary)[2], IPCToken readEnd, IPCToken writeEnd, AuSPtr event, AuSPtr mutex); ~IPCPipeImpl(); PROXY_INTERNAL_INTERFACE_(LSHandle::) virtual AuSPtr NewAsyncTransaction() override; virtual AuSPtr AsReadChannelIsOpen() override; virtual AuSPtr AsReadChannelHasData() override; virtual bool Read(const Memory::MemoryViewStreamWrite &write, bool nonblock) override; virtual bool Write(const Memory::MemoryViewStreamRead &read) override; virtual AuString ExportToString() override; bool IsSignaled() override; bool WaitOn(AuUInt32 timeout) override; Loop::ELoopSource GetType() override; private: int fds[2] {-1, -1}; int secondary[2] {-1, -1}; AuSPtr fsHandle_; AuSPtr fsStream_; IPCToken readEnd_; IPCToken writeEnd_; AuSPtr event_; AuSPtr mutex_; }; IPCPipeImpl::IPCPipeImpl(int (fds2)[2], int (fds3)[2], IPCToken readEnd, IPCToken writeEnd, AuSPtr event, AuSPtr mutex) : fds {fds2[0], fds2[1]}, secondary {fds3[0], fds3[1]}, readEnd_(readEnd), writeEnd_(writeEnd), event_(event), mutex_(mutex) { this->handle = fds[0]; this->fsHandle_ = AuMakeShared(); this->fsStream_ = AuMakeShared(); this->fsHandle_->Init(fds2[0], fds2[1]); this->fsStream_->Init(this->fsHandle_); } IPCPipeImpl::~IPCPipeImpl() { int fd {-1}; if ((fd = AuExchange(fds[0], -1)) != -1) { IO::UNIX::FDServeEnd(readEnd_); //::close(fd); } if ((fd = AuExchange(fds[1], -1)) != -1) { IO::UNIX::FDServeEnd(writeEnd_); //::close(fd); } if ((fd = AuExchange(secondary[0], -1)) != -1) { ::close(fd); } if ((fd = AuExchange(secondary[1], -1)) != -1) { ::close(fd); } fsHandle_.reset(); fsStream_.reset(); if (secondary[0] == -1) { event_->Reset(); } } AuSPtr IPCPipeImpl::AsReadChannelIsOpen() { return this->event_; } AuSPtr IPCPipeImpl::AsReadChannelHasData() { return AuSharedFromThis(); } AuSPtr IPCPipeImpl::NewAsyncTransaction() { return this->fsStream_->NewTransaction(); } bool IPCPipeImpl::Read(const Memory::MemoryViewStreamWrite &write, bool nonblock) { auto handle = fds[0]; auto control = ::fcntl(handle, F_GETFL); auto ref = control; if (nonblock) { control |= O_NONBLOCK; } else { control &= ~O_NONBLOCK; } if (ref != control) { ::fcntl(handle, F_SETFL, control); } if (!write.ptr) { #if defined(FIONREAD) int available {}; if (::ioctl(ref, FIONREAD, &available) < 0) { return false; } write.outVariable = available; return true; #else SysPushErrorHAL("Cannot pull bytes available in the IPC on this UNIX platform"); return false; #endif } int tmp; do { tmp = ::read(handle, write.ptr, write.length); } while ((tmp == -1 && errno == EINTR)); if (tmp <= 0) { if (tmp == 0) { return nonblock; } SysPushErrorMem(); return false; } write.outVariable = tmp; return true; } bool IPCPipeImpl::Write(const Memory::MemoryViewStreamRead &read)//, bool nonblock) { bool nonblock = true; auto handle = this->fds[1]; auto control = ::fcntl(handle, F_GETFL); auto ref = control; if (nonblock) { control |= O_NONBLOCK; } else { control &= ~O_NONBLOCK; } if (ref != control) { ::fcntl(handle, F_SETFL, control); } int tmp; do { tmp = ::write(handle, read.ptr, read.length); } while ((tmp == -1 && errno == EINTR)); if (tmp <= 0) { if (tmp == 0) { return nonblock; } SysPushErrorMem(); return false; } read.outVariable = tmp; return true; } bool IPCPipeImpl::IsSignaled() { return LSHandle::IsSignaled(); } bool IPCPipeImpl::WaitOn(AuUInt32 timeout) { return LSHandle::WaitOn(timeout); } Loop::ELoopSource IPCPipeImpl::GetType() { return Loop::ELoopSource::eSourceIPCReadPipe; } AuString IPCPipeImpl::ExportToString() { IPC::IPCHandle handle; handle.PushId(EIPCHandleType::eIPCPipe, AuStaticCast(this->event_)->handle_.values[0].token); #if defined(AURORA_IS_LINUX_DERIVED) auto that = AuStaticCast(this->mutex_); handle.PushId(EIPCHandleType::eIPCPrimitiveMutex, that->token_); SysAssert(that->mem_); handle.PushId(EIPCHandleType::eIPCMemory, AuStaticCast(that->mem_)->handle_.values[0].token); handle.values[1].token.word = that->index_; #else handle.PushId(EIPCHandleType::eIPCPrimitiveMutex, ->handle_.values[0].token); #endif handle.PushId(EIPCHandleType::eIPCPipeEnd, this->readEnd_); handle.PushId(EIPCHandleType::eIPCPipeEnd, this->writeEnd_); return handle.ToString(); } AUKN_SYM AuSPtr NewPipe() { IPCToken readEnd, writeEnd; int fds1[2]; int fds2[2]; auto event = NewEvent(false, false); if (!event) { SysPushErrorNested(); return {}; } auto mutex = NewMutex(); if (!mutex) { SysPushErrorNested(); return {}; } if (::pipe(fds1) == -1) { SysPushErrorIO(); return {}; } if (::pipe(fds2) == -1) { SysPushErrorIO(); ::close(fds1[0]); ::close(fds1[1]); return {}; } if (!IO::UNIX::FDServe(fds1[0], readEnd)) { SysPushErrorIO(); ::close(fds1[0]); ::close(fds1[1]); ::close(fds2[0]); ::close(fds2[1]); return {}; } if (!IO::UNIX::FDServe(fds2[1], writeEnd)) { SysPushErrorIO(); IO::UNIX::FDServeEnd(readEnd); ::close(fds1[0]); ::close(fds1[1]); ::close(fds2[0]); ::close(fds2[1]); return {}; } int serverPair[2] {fds2[0], fds1[1]}; int clientPair[2] {fds1[0], fds2[1]}; auto handle = AuMakeShared(serverPair, clientPair, readEnd, writeEnd, event, mutex); if (!handle) { SysPushErrorMem(); IO::UNIX::FDServeEnd(readEnd); IO::UNIX::FDServeEnd(writeEnd); ::close(fds1[0]); ::close(fds1[1]); ::close(fds2[0]); ::close(fds2[1]); return {}; } return handle; } AUKN_SYM AuSPtr ImportPipe(const AuString &handleString) { int fds[2] {-1, -1}; IPCHandle handle; if (!handle.FromString(handleString)) { SysPushErrorParseError(); return {}; } auto eventVal = handle.GetToken(IPC::EIPCHandleType::eIPCPipe, 0); if (!eventVal) { SysPushErrorParseError(); return {}; } auto mutexVal = handle.GetToken(IPC::EIPCHandleType::eIPCPrimitiveMutex, 1); if (!mutexVal) { SysPushErrorParseError(); return {}; } int offset {}; #if defined(AURORA_IS_LINUX_DERIVED) offset = 1; auto ipcMem = handle.GetToken(IPC::EIPCHandleType::eIPCMemory, 2); if (!ipcMem) { SysPushErrorParseError(); return {}; } #endif auto readVal = handle.GetToken(IPC::EIPCHandleType::eIPCPipeEnd, 2 + offset); if (!readVal) { SysPushErrorParseError(); return {}; } auto writeVal = handle.GetToken(IPC::EIPCHandleType::eIPCPipeEnd, 3 + offset); if (!writeVal) { SysPushErrorParseError(); return {}; } auto event = ImportEventEx(eventVal->token); if (!event) { return {}; } auto mutex = ImportMutexEx(mutexVal->token, ipcMem->token, mutexVal->token.word); if (!mutex) { return {}; } if (!IO::UNIX::FDAccept(readVal->token, fds[0])) { SysPushErrorNested(); return {}; } if (!IO::UNIX::FDAccept(writeVal->token, fds[1])) { ::close(fds[0]); SysPushErrorNested(); return {}; } int dummy[2] {-1, -1}; auto object = AuMakeShared(fds, dummy, readVal->token, writeVal->token, event, mutex); if (!object) { SysPushErrorMem(); ::close(fds[0]); ::close(fds[1]); return {}; } mutex->WaitOn(); if (event->IsSignaled()) { mutex->Unlock(); SysPushErrorIO("Pipe Busy"); ::close(fds[0]); ::close(fds[1]); return {}; } event->Set(); mutex->Unlock(); return object; } }