/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: IPCPipe.Linux.cpp Date: 2022-4-14 Author: Reece ***/ #include #include "IPC.hpp" #include "IPCHandle.hpp" #include "IPCPipe.Unix.hpp" #include #include #include #include #include #include #include namespace Aurora::IPC { ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Pipes ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// struct IPCPipeImpl : IPCPipe, Loop::LSHandle, AuEnableSharedFromThis { IPCPipeImpl(int (fds)[2], int (secondary)[2], IPCHandle readEnd, IPCHandle writeEnd, AuSPtr event, AuSPtr mutex); ~IPCPipeImpl(); PROXY_INTERNAL_INTERFACE_(LSHandle::) virtual AuSPtr NewAsyncTransaction() override; virtual AuSPtr AsReadChannelIsOpen() override; virtual AuSPtr AsReadChannelHasData() override; virtual bool Read(const Memory::MemoryViewStreamWrite &write, bool nonblock) override; virtual bool Write(const Memory::MemoryViewStreamRead &read) override; virtual AuString ExportToString() override; bool IsSignaled() override; bool WaitOn(AuUInt32 timeout) override; Loop::ELoopSource GetType() override; private: int fds[2] {-1, -1}; int secondary[2] {-1, -1}; AuSPtr fsHandle_; AuSPtr fsStream_; IPCHandle readEnd_; IPCHandle writeEnd_; AuSPtr event_; AuSPtr mutex_; }; IPCPipeImpl::IPCPipeImpl(int (fds2)[2], int (fds3)[2], IPCHandle readEnd, IPCHandle writeEnd, AuSPtr event, AuSPtr mutex) : fds {fds2[0], fds2[1]}, secondary {fds3[0], fds3[1]}, readEnd_(readEnd), writeEnd_(writeEnd), event_(event), mutex_(mutex) { this->handle = fds[0]; this->fsHandle_ = AuMakeShared(); this->fsStream_ = AuMakeShared(); this->fsHandle_->Init(fds2[0], fds2[1]); this->fsStream_->Init(this->fsHandle_); } IPCPipeImpl::~IPCPipeImpl() { int fd {-1}; if ((fd = AuExchange(fds[0], -1)) != -1) { IO::UNIX::FDServeEnd(readEnd_); //::close(fd); } if ((fd = AuExchange(fds[1], -1)) != -1) { IO::UNIX::FDServeEnd(writeEnd_); //::close(fd); } if ((fd = AuExchange(secondary[0], -1)) != -1) { ::close(fd); } if ((fd = AuExchange(secondary[1], -1)) != -1) { ::close(fd); } fsHandle_.reset(); fsStream_.reset(); if (secondary[0] == -1) { event_->Reset(); } } AuSPtr IPCPipeImpl::AsReadChannelIsOpen() { return this->event_; } AuSPtr IPCPipeImpl::AsReadChannelHasData() { return AuSharedFromThis(); } AuSPtr IPCPipeImpl::NewAsyncTransaction() { return this->fsStream_->NewTransaction(); } bool IPCPipeImpl::Read(const Memory::MemoryViewStreamWrite &write, bool nonblock) { auto handle = fds[0]; auto control = ::fcntl(handle, F_GETFL); auto ref = control; if (nonblock) { control |= O_NONBLOCK; } else { control &= ~O_NONBLOCK; } if (ref != control) { ::fcntl(handle, F_SETFL, control); } if (!write.ptr) { #if defined(FIONREAD) int available {}; if (::ioctl(ref, FIONREAD, &available) < 0) { return false; } write.outVariable = available; return true; #else SysPushErrorHAL("Cannot pull bytes available in the IPC on this UNIX platform"); return false; #endif } int tmp; do { tmp = ::read(handle, write.ptr, write.length); } while ((tmp == -1 && errno == EINTR)); if (tmp <= 0) { if (tmp == 0) { return nonblock; } SysPushErrorMem(); return false; } write.outVariable = tmp; return true; } bool IPCPipeImpl::Write(const Memory::MemoryViewStreamRead &read)//, bool nonblock) { bool nonblock = false; auto handle = this->fds[1]; auto control = ::fcntl(handle, F_GETFL); auto ref = control; if (nonblock) { control |= O_NONBLOCK; } else { control &= ~O_NONBLOCK; } if (ref != control) { ::fcntl(handle, F_SETFL, control); } int tmp; do { tmp = ::write(handle, read.ptr, read.length); } while ((tmp == -1 && errno == EINTR)); if (tmp <= 0) { if (tmp == 0) { return nonblock; } SysPushErrorMem(); return false; } read.outVariable = tmp; return true; } bool IPCPipeImpl::IsSignaled() { return LSHandle::IsSignaled(); } bool IPCPipeImpl::WaitOn(AuUInt32 timeout) { return LSHandle::WaitOn(timeout); } Loop::ELoopSource IPCPipeImpl::GetType() { return Loop::ELoopSource::eSourceIPCReadPipe; } AuString IPCPipeImpl::ExportToString() { return this->readEnd_.ToString() + "." + this->writeEnd_.ToString() + "." + this->event_->ExportToString() + "." + this->mutex_->ExportToString(); } AUKN_SYM AuSPtr NewPipe() { IPCHandle readEnd, writeEnd; int fds1[2]; int fds2[2]; auto event = NewEvent(false, false); if (!event) { SysPushErrorNested(); return {}; } auto mutex = NewMutex(); if (!mutex) { SysPushErrorNested(); return {}; } if (::pipe(fds1) == -1) { SysPushErrorIO(); return {}; } if (::pipe(fds2) == -1) { SysPushErrorIO(); ::close(fds1[0]); ::close(fds1[1]); return {}; } if (!IO::UNIX::FDServe(true, true, true, true, fds1[0], readEnd)) { SysPushErrorIO(); ::close(fds1[0]); ::close(fds1[1]); ::close(fds2[0]); ::close(fds2[1]); return {}; } if (!IO::UNIX::FDServe(true, true, true, true, fds2[1], writeEnd)) { SysPushErrorIO(); IO::UNIX::FDServeEnd(readEnd); ::close(fds1[0]); ::close(fds1[1]); ::close(fds2[0]); ::close(fds2[1]); return {}; } int serverPair[2] {fds2[0], fds1[1]}; int clientPair[2] {fds1[0], fds2[1]}; auto handle = AuMakeShared(serverPair, clientPair, readEnd, writeEnd, event, mutex); if (!handle) { SysPushErrorMem(); IO::UNIX::FDServeEnd(readEnd); IO::UNIX::FDServeEnd(writeEnd); ::close(fds1[0]); ::close(fds1[1]); ::close(fds2[0]); ::close(fds2[1]); return {}; } return handle; } AUKN_SYM AuSPtr ImportPipe(const AuString &handle) { IPCHandle readEnd, writeEnd; int fds[2] {-1, -1}; auto itr = handle.find('.'); if (itr == AuString::npos) { return {}; } auto itr2 = handle.find_first_of('.', itr + 1); if (itr2 == AuString::npos) { return {}; } auto itr3 = handle.find_first_of('.', itr2 + 1); if (itr3 == AuString::npos) { return {}; } auto readString = handle.substr(0, itr); auto writeString = handle.substr(itr + 1, itr2 - itr - 1); auto eventString = handle.substr(itr2 + 1, itr3 - itr2 - 1); auto mutexString = handle.substr(itr3 + 1); if (!readEnd.FromString(readString)) { SysPushErrorParseError(); return {}; } if (!writeEnd.FromString(writeString)) { SysPushErrorParseError(); return {}; } auto event = ImportEvent(eventString); if (!event) { return {}; } auto mutex = ImportMutex(mutexString); if (!mutex) { return {}; } if (!IO::UNIX::FDAccept(readEnd, fds[0])) { SysPushErrorNested(); return {}; } if (!IO::UNIX::FDAccept(writeEnd, fds[1])) { ::close(fds[0]); SysPushErrorNested(); return {}; } int dummy[2] {-1, -1}; auto object = AuMakeShared(fds, dummy, readEnd, writeEnd, event, mutex); if (!object) { SysPushErrorMem(); ::close(fds[0]); ::close(fds[1]); return {}; } mutex->WaitOn(); if (event->IsSignaled()) { mutex->Unlock(); SysPushErrorIO("Pipe Busy"); ::close(fds[0]); ::close(fds[1]); return {}; } event->Set(); mutex->Unlock(); return object; } }