/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuIPCPipe.Linux.cpp Date: 2022-4-14 Author: Reece ***/ #include #include "IPC.hpp" #include "AuIPCHandle.hpp" #include "AuIPCPipe.Unix.hpp" #include #include #include #include #include namespace Aurora::IO::IPC { ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Pipes ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// IPCPipeImpl::~IPCPipeImpl() { int fd {-1}; if (secondary[0] == -1) { this->mutex_->Unlock(); this->event_->Reset(); } if ((fd = AuExchange(fds[0], -1)) != -1) { IO::UNIX::FDServeEnd(readEnd_); //::close(fd); } if ((fd = AuExchange(fds[1], -1)) != -1) { IO::UNIX::FDServeEnd(writeEnd_); //::close(fd); } if ((fd = AuExchange(secondary[0], -1)) != -1) { ::close(fd); } if ((fd = AuExchange(secondary[1], -1)) != -1) { ::close(fd); } fsHandle_.reset(); fsStream_.reset(); } void IPCPipeImpl::DrainOtherFd() { #if defined(FIONREAD) int available {}; if (::ioctl(this->secondary[0], FIONREAD, &available) < 0) { return; } if (!available) { return; } AuByteBuffer temp(available); SysAssert(temp); AuUInt uCounter {}; while (this->ReadEx({temp, uCounter}, true, false)) { temp.writePtr = temp.readPtr = temp.base; } #else AuByteBuffer temp(4096 * 10); SysAssert(temp); AuUInt uCounter {}; while (this->ReadEx({temp, uCounter}, true, false)) { temp.writePtr = temp.readPtr = temp.base; } #endif } AuUInt IPCPipeImpl::GetPreemptFd() { return this->eventPreempt_.GetHandle(); } bool IPCPipeImpl::LIOS_PopOne() { if (!this->bIsSendingZero) { return false; } #if defined(FIONREAD) int available {}; if (::ioctl(fds[0], FIONREAD, &available) < 0) { return false; } if (available) { return false; } #endif this->FinishFinalize(); return true; } void IPCPipeImpl::FinishFinalize() { this->bIsSendingZero = false; this->bHasDied = true; } const AuList &IPCPipeImpl::GetHandles() { return this->handles; } bool IPCPipeImpl::Singular() { return false; } bool IPCPipeImpl::OnTrigger(AuUInt handle) { return true; } IPCPipeImpl::IPCPipeImpl(int (fds2)[2], int (fds3)[2], IPCToken readEnd, IPCToken writeEnd, AuSPtr event, AuSPtr mutex) : fds {fds2[0], fds2[1]}, secondary {fds3[0], fds3[1]}, readEnd_(readEnd), writeEnd_(writeEnd), event_(event), mutex_(mutex), eventPreempt_(false, false, true), pipeReader_(this), pipeWriter_(this) { this->handles = {fds[0], eventPreempt_.GetHandle()}; #if defined(AURORA_IS_LINUX_DERIVED) if (fds3[0] != -1) { AuStaticCast(mutex)->pMutexClosedHook = this; } #endif this->fsHandle_ = AuIO::IOHandleShared(); SysAssert(this->fsHandle_); this->fsStream_ = AuMakeShared(); AuStaticCast(this->fsHandle_)->pIPCPipe = this; this->fsHandle_->InitFromPairMove(fds2[0], fds2[1]); this->fsStream_->Init(this->fsHandle_); this->fsBlockingStream_ = AuFS::OpenBlockingFileStreamFromHandle(this->fsHandle_); SysAssert(this->fsBlockingStream_); } AuSPtr IPCPipeImpl::AsReadChannelIsOpen() { return this->event_; } AuSPtr IPCPipeImpl::AsReadChannelHasData() { return AuSharedFromThis(); } AuSPtr IPCPipeImpl::NewAsyncTransaction() { return this->fsStream_->NewTransaction(); } AuSPtr IPCPipeImpl::GetCurrentSharedDuplexHandles() { return this->fsHandle_; } bool IPCPipeImpl::Read(const Memory::MemoryViewStreamWrite &write, bool nonblock) { return this->ReadEx(write, nonblock, true); } bool IPCPipeImpl::ReadEx(const Memory::MemoryViewStreamWrite &write, bool nonblock, bool bSide) { auto handle = bSide ? this->fds[0] : this->secondary[0]; auto control = ::fcntl(handle, F_GETFL); auto ref = control; if (control < 0) { return false; } if (nonblock) { control |= O_NONBLOCK; } else { control &= ~O_NONBLOCK; } if (ref != control) { ::fcntl(handle, F_SETFL, control); } if (!write.ptr) { #if defined(FIONREAD) int available {}; if (::ioctl(handle, FIONREAD, &available) < 0) { return false; } write.outVariable = available; return true; #else SysPushErrorHAL("Cannot pull bytes available in the IPC on this UNIX platform"); return false; #endif } int tmp; do { tmp = ::read(handle, write.ptr, write.length); } while ((tmp == -1 && errno == EINTR)); if (tmp <= 0) { if (tmp == 0) { return nonblock; } SysPushErrorMem(); return false; } write.outVariable = tmp; return true; } bool IPCPipeImpl::Write(const Memory::MemoryViewStreamRead &read)//, bool nonblock) { bool nonblock = true; auto handle = this->fds[1]; auto control = ::fcntl(handle, F_GETFL); auto ref = control; if (nonblock) { control |= O_NONBLOCK; } else { control &= ~O_NONBLOCK; } if (ref != control) { ::fcntl(handle, F_SETFL, control); } int tmp; do { tmp = ::write(handle, read.ptr, read.length); } while ((tmp == -1 && errno == EINTR)); if (tmp <= 0) { if (tmp == 0) { return nonblock; } SysPushErrorMem(); return false; } read.outVariable = tmp; return true; } bool IPCPipeImpl::IsSignaled() { return LSHandle::IsSignaled(); } bool IPCPipeImpl::WaitOn(AuUInt32 timeout) { return LSHandle::WaitOn(timeout); } Loop::ELoopSource IPCPipeImpl::GetType() { return Loop::ELoopSource::eSourceIPCReadPipe; } #if defined(AURORA_IS_LINUX_DERIVED) bool IPCPipeImpl::OnClosed() { if (this->bHasDied) { this->DrainOtherFd(); this->bHasDied = false; this->eventPreempt_.Reset(); this->event_->Reset(); return true; } this->SendTerminateSignal(); return false; } #endif void IPCPipeImpl::SendTerminateSignal() { this->bIsSendingZero = true; this->eventPreempt_.Set(); this->event_->Reset(); } AuString IPCPipeImpl::ExportToString() { IPC::IPCHandle handle; handle.PushId(EIPCHandleType::eIPCPipe, AuStaticCast(this->event_)->handle_.values[0].token); #if defined(AURORA_IS_LINUX_DERIVED) auto that = AuStaticCast(this->mutex_); handle.PushId(EIPCHandleType::eIPCPrimitiveMutex, that->token_); SysAssert(that->mem_); handle.PushId(EIPCHandleType::eIPCMemory, AuStaticCast(that->mem_)->handle_.values[0].token); handle.values[1].token.word = that->index_; #else handle.PushId(EIPCHandleType::eIPCPrimitiveMutex, ->handle_.values[0].token); #endif handle.PushId(EIPCHandleType::eIPCPipeEnd, this->readEnd_); handle.PushId(EIPCHandleType::eIPCPipeEnd, this->writeEnd_); return handle.ToString(); } AuSPtr IPCPipeImpl::ToFileStream() { return this->fsBlockingStream_; } AuSPtr IPCPipeImpl::ToStreamReader() { return AuSPtr(this->SharedFromThis(), &this->pipeReader_); } AuSPtr IPCPipeImpl::ToStreamWriter() { return AuSPtr(this->SharedFromThis(), &this->pipeWriter_); } AUKN_SYM AuSPtr NewPipeEx(AuUInt32 uPipeLength) { IPCToken readEnd, writeEnd; int fds1[2]; int fds2[2]; auto event = NewEvent(false, false); if (!event) { SysPushErrorNested(); return {}; } auto mutex = NewMutex(); if (!mutex) { SysPushErrorNested(); return {}; } if (::pipe(fds1) == -1) { SysPushErrorIO("no pipe"); return {}; } if (::pipe(fds2) == -1) { SysPushErrorIO("no pipe"); ::close(fds1[0]); ::close(fds1[1]); return {}; } #if defined(F_SETPIPE_SZ) if (uPipeLength) { uPipeLength = AuPageRound(uPipeLength, AuHwInfo::GetPageSize()); if (fcntl(fds1[1], F_SETPIPE_SZ, uPipeLength) == -1 || fcntl(fds2[1], F_SETPIPE_SZ, uPipeLength) == -1) { SysPushErrorIO("couldnt expand pipe"); ::close(fds1[0]); ::close(fds1[1]); ::close(fds2[0]); ::close(fds2[1]); return {}; } } #endif if (!IO::UNIX::FDServe(fds1[0], readEnd)) { SysPushErrorIO(); ::close(fds1[0]); ::close(fds1[1]); ::close(fds2[0]); ::close(fds2[1]); return {}; } if (!IO::UNIX::FDServe(fds2[1], writeEnd)) { SysPushErrorIO(); IO::UNIX::FDServeEnd(readEnd); ::close(fds1[0]); ::close(fds1[1]); ::close(fds2[0]); ::close(fds2[1]); return {}; } int serverPair[2] {fds2[0], fds1[1]}; int clientPair[2] {fds1[0], fds2[1]}; auto handle = AuMakeShared(serverPair, clientPair, readEnd, writeEnd, event, mutex); if (!handle) { SysPushErrorMem(); IO::UNIX::FDServeEnd(readEnd); IO::UNIX::FDServeEnd(writeEnd); ::close(fds1[0]); ::close(fds1[1]); ::close(fds2[0]); ::close(fds2[1]); return {}; } return handle; } AUKN_SYM AuSPtr NewPipe() { return NewPipeEx(0); } AUKN_SYM AuSPtr ImportPipe(const AuString &handleString) { int fds[2] {-1, -1}; IPCHandle handle; if (!handle.FromString(handleString)) { SysPushErrorParseError(); return {}; } auto eventVal = handle.GetToken(IPC::EIPCHandleType::eIPCPipe, 0); if (!eventVal) { SysPushErrorParseError(); return {}; } auto mutexVal = handle.GetToken(IPC::EIPCHandleType::eIPCPrimitiveMutex, 1); if (!mutexVal) { SysPushErrorParseError(); return {}; } int offset {}; #if defined(AURORA_IS_LINUX_DERIVED) offset = 1; auto ipcMem = handle.GetToken(IPC::EIPCHandleType::eIPCMemory, 2); if (!ipcMem) { SysPushErrorParseError(); return {}; } #endif auto readVal = handle.GetToken(IPC::EIPCHandleType::eIPCPipeEnd, 2 + offset); if (!readVal) { SysPushErrorParseError(); return {}; } auto writeVal = handle.GetToken(IPC::EIPCHandleType::eIPCPipeEnd, 3 + offset); if (!writeVal) { SysPushErrorParseError(); return {}; } auto event = ImportEventEx(eventVal->token); if (!event) { return {}; } auto mutex = ImportMutexEx(mutexVal->token, ipcMem->token, mutexVal->token.word); if (!mutex) { return {}; } if (!IO::UNIX::FDAccept(readVal->token, fds[0])) { SysPushErrorNested(); return {}; } if (!IO::UNIX::FDAccept(writeVal->token, fds[1])) { ::close(fds[0]); SysPushErrorNested(); return {}; } int dummy[2] {-1, -1}; auto object = AuMakeShared(fds, dummy, readVal->token, writeVal->token, event, mutex); if (!object) { SysPushErrorMem(); ::close(fds[0]); ::close(fds[1]); return {}; } mutex->WaitOn(0); if (event->IsSignaled()) { mutex->Unlock(); SysPushErrorIO("Pipe Busy"); ::close(fds[0]); ::close(fds[1]); return {}; } event->Set(); return object; } }