[+] IPC pipes

[*] Fix socket leak under UNIX fallback path
This commit is contained in:
Reece Wilson 2022-04-15 15:46:07 +01:00
parent 2ce106d8a9
commit 9f2ff9de19
10 changed files with 338 additions and 61 deletions

View File

@ -9,13 +9,12 @@
namespace Aurora::IPC
{
struct IPCPipe : IExportableIPC
struct IPCPipe : IExportableIPC, Loop::ILoopSource
{
virtual AuSPtr<Loop::ILoopSource> ToLoopQueue() = 0;
virtual bool Read(const Memory::MemoryViewStreamWrite &write, bool nonblock = true) = 0;
virtual bool Read (const Memory::MemoryViewStreamWrite &write, bool nonblock = true) = 0;
virtual bool Write(const Memory::MemoryViewStreamRead &read, bool nonblock = true) = 0;
};
AUKN_SYM AuSPtr<IPCPipe> NewPipe();
AUKN_SYM AuSPtr<IPCPipe> ImportPipe(const AuString &handle);
}

View File

@ -33,6 +33,9 @@ namespace Aurora::Loop
// async work group trigger
eSourceAsync,
// IPC pipe
eSourceIPCReadPipe,
// glib oses only
eSourceGlib,

View File

@ -436,6 +436,7 @@ namespace Aurora::IO::UNIX
SysPushErrorNested();
}
::close(fd);
return true;
}

View File

@ -9,5 +9,44 @@
namespace Aurora::IPC
{
#define PROXY_INTERNAL_INTERFACE_(Base)\
virtual void OnPresleep() override \
{ \
Base OnPresleep(); \
}; \
virtual bool OnTrigger(AuUInt handle) override \
{ \
return Base OnTrigger(handle); \
} \
virtual void OnFinishSleep() override \
{ \
Base OnFinishSleep(); \
} \
virtual bool Singular() override \
{ \
return Base Singular(); \
} \
virtual AuUInt GetHandle() override \
{ \
return Base GetHandle(); \
} \
virtual AuList<AuUInt> GetHandles() override \
{ \
return Base GetHandles(); \
} \
virtual AuList<AuUInt> GetWriteHandles() override \
{ \
return Base GetWriteHandles(); \
} \
virtual AuUInt GetWriteHandle() override \
{ \
return Base GetWriteHandle(); \
} \
bool HasValidHandle() \
{ \
return Base HasValidHandle(); \
}
#define PROXY_INTERNAL_INTERFACE(Base) PROXY_INTERNAL_INTERFACE_(Base.)
}

View File

@ -17,6 +17,11 @@ namespace Aurora::IPC
// Dead simple to implement, it's 22:07, and i wanna sleep soon.
// We should text serialize a bitmap later...
IPCHandle::IPCHandle()
{
AuMemset(this, 0, sizeof(*this));
}
void IPCHandle::NewId(bool a, bool b, bool c, bool d)
{
this->flags[0] = a;
@ -25,27 +30,33 @@ namespace Aurora::IPC
this->flags[3] = d;
this->word = 0;
NewId();
}
void IPCHandle::NewId(AuUInt len)
{
AuMemset(this->flags, 0, sizeof(this->flags));
this->word = len;
NewId();
}
void IPCHandle::NewId()
{
#if defined(AURORA_IS_POSIX_DERIVED)
this->cookie = AuRng::RngU32();
this->pid = getpid();
while (!this->cookie)
{
this->cookie = AuRng::RngU32();
}
this->pid = getpid();
#else
auto temp = AuRng::ReadString(AuArraySize(this->path), AuRng::ERngStringCharacters::eAlphaNumericCharacters);
AuMemcpy(this->path, temp.data(), data.size());
#endif
}
void IPCHandle::NewId(AuUInt len)
{
AuMemset(this->flags, 0, sizeof(this->flags));
this->word = len;
NewId();
}
bool IPCHandle::FromString(const AuString &in)
{
if (in.size() < 4)
@ -66,7 +77,7 @@ namespace Aurora::IPC
return false;
}
AuMemcpy(path, &in[4], 16);
AuMemcpy(this->path, &in[4], 16);
if (in.size() > 4 + 16)
{
@ -102,6 +113,11 @@ namespace Aurora::IPC
}
this->cookie = word;
if (this->cookie == 0)
{
return false;
}
word = strtoll(endPtr + 1, &endPtr, 10);
if (errno == ERANGE)

View File

@ -11,6 +11,8 @@ namespace Aurora::IPC
{
struct IPCHandle
{
IPCHandle();
bool flags[4];
AuUInt word;
union
@ -26,7 +28,7 @@ namespace Aurora::IPC
void NewId();
void NewId(AuUInt len);
void NewId(bool a, bool b, bool c, bool d);
bool FromString(const AuString &in);
AuString ToString() const;
AuString ToNTPath() const;

View File

@ -10,21 +10,274 @@
#include "IPCHandle.hpp"
#include "IPCPipe.Unix.hpp"
#include <Source/IO/UNIX/FDIpcServer.hpp>
#include <Source/Loop/ILoopSourceEx.hpp>
#include <Source/Loop/LSHandle.hpp>
#include <fcntl.h>
namespace Aurora::IPC
{
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Pipes
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
struct IPCPipeImpl : IPCPipe, Loop::LSHandle
{
IPCPipeImpl(int (fds)[2], IPCHandle readEnd, IPCHandle writeEnd);
~IPCPipeImpl();
PROXY_INTERNAL_INTERFACE_(LSHandle::)
virtual bool Read(const Memory::MemoryViewStreamWrite &write, bool nonblock) override;
virtual bool Write(const Memory::MemoryViewStreamRead &read, bool nonblock) override;
virtual AuString ExportToString() override;
bool IsSignaled() override;
bool WaitOn(AuUInt32 timeout) override;
Loop::ELoopSource GetType() override;
private:
int fds[2] {-1, -1};
//Loop::LSHandle lsHandle_;
IPCHandle readEnd_;
IPCHandle writeEnd_;
};
IPCPipeImpl::IPCPipeImpl(int (fds2)[2], IPCHandle readEnd, IPCHandle writeEnd) :
fds {fds2[0], fds2[1]}, readEnd_(readEnd), writeEnd_(writeEnd)
{
this->handle = fds[0];
}
IPCPipeImpl::~IPCPipeImpl()
{
int fd {-1};
if ((fd = AuExchange(fds[0], -1)) != -1)
{
IO::UNIX::FDServeEnd(readEnd_);
::close(fd);
}
if ((fd = AuExchange(fds[1], -1)) != -1)
{
IO::UNIX::FDServeEnd(writeEnd_);
::close(fd);
}
}
bool IPCPipeImpl::Read(const Memory::MemoryViewStreamWrite &write, bool nonblock)
{
auto handle = fds[0];
auto control = ::fcntl(handle, F_GETFL);
auto ref = control;
if (nonblock)
{
control |= O_NONBLOCK;
}
else
{
control &= ~O_NONBLOCK;
}
if (ref != control)
{
::fcntl(handle, F_SETFL, control);
}
int tmp;
do
{
tmp = ::read(handle, write.ptr, write.length);
} while ((tmp == -1 && errno == EINTR));
if (tmp <= 0)
{
if (tmp == 0)
{
return nonblock;
}
SysPushErrorMem();
return false;
}
write.outVariable = tmp;
return true;
}
bool IPCPipeImpl::Write(const Memory::MemoryViewStreamRead &read, bool nonblock)
{
auto handle = this->fds[1];
auto control = ::fcntl(handle, F_GETFL);
auto ref = control;
if (nonblock)
{
control |= O_NONBLOCK;
}
else
{
control &= ~O_NONBLOCK;
}
if (ref != control)
{
::fcntl(handle, F_SETFL, control);
}
int tmp;
do
{
tmp = ::write(handle, read.ptr, read.length);
} while ((tmp == -1 && errno == EINTR));
if (tmp <= 0)
{
if (tmp == 0)
{
return nonblock;
}
SysPushErrorMem();
return false;
}
read.outVariable = tmp;
return true;
}
bool IPCPipeImpl::IsSignaled()
{
return LSHandle::IsSignaled();
}
bool IPCPipeImpl::WaitOn(AuUInt32 timeout)
{
return LSHandle::WaitOn(timeout);
}
Loop::ELoopSource IPCPipeImpl::GetType()
{
return Loop::ELoopSource::eSourceIPCReadPipe;
}
AuString IPCPipeImpl::ExportToString()
{
return this->readEnd_.ToString() + "." + this->writeEnd_.ToString();
}
AUKN_SYM AuSPtr<IPCPipe> NewPipe()
{
SysPushErrorUnimplemented();
return {};
IPCHandle readEnd, writeEnd;
int fds[2];
if (::pipe(fds) == -1)
{
SysPushErrorIO();
return {};
}
if (!IO::UNIX::FDServe(true, true, true, true, fds[0], readEnd))
{
SysPushErrorIO();
::close(fds[0]);
::close(fds[1]);
return {};
}
if (!IO::UNIX::FDServe(true, true, true, true, fds[1], writeEnd))
{
SysPushErrorIO();
IO::UNIX::FDServeEnd(readEnd);
::close(fds[0]);
::close(fds[1]);
return {};
}
auto handle = AuMakeShared<IPCPipeImpl>(fds, readEnd, writeEnd);
if (!handle)
{
SysPushErrorMem();
IO::UNIX::FDServeEnd(readEnd);
IO::UNIX::FDServeEnd(writeEnd);
::close(fds[0]);
::close(fds[1]);
return {};
}
return handle;
}
AUKN_SYM AuSPtr<IPCPipe> ImportPipe(const AuString &handle)
{
SysPushErrorUnimplemented();
return {};
IPCHandle readEnd, writeEnd;
int fds[2] {-1, -1};
auto itr = handle.find('.');
if (itr == AuString::npos)
{
return {};
}
auto readString = handle.substr(0, itr);
auto writeString = handle.substr(itr + 1);
if (!readEnd.FromString(readString))
{
SysPushErrorParseError();
return {};
}
if (!writeEnd.FromString(writeString))
{
SysPushErrorParseError();
return {};
}
if (!IO::UNIX::FDAccept(readEnd, fds[0]))
{
SysPushErrorNested();
return {};
}
if (!IO::UNIX::FDAccept(writeEnd, fds[1]))
{
::close(fds[0]);
SysPushErrorNested();
return {};
}
if (!IO::UNIX::FDServe(true, true, true, true, fds[0], readEnd))
{
SysPushErrorIO();
::close(fds[0]);
::close(fds[1]);
return {};
}
if (!IO::UNIX::FDServe(true, true, true, true, fds[1], writeEnd))
{
SysPushErrorIO();
IO::UNIX::FDServeEnd(readEnd);
::close(fds[0]);
::close(fds[1]);
return {};
}
auto object = AuMakeShared<IPCPipeImpl>(fds, readEnd, writeEnd);
if (!object)
{
SysPushErrorMem();
IO::UNIX::FDServeEnd(readEnd);
IO::UNIX::FDServeEnd(writeEnd);
::close(fds[0]);
::close(fds[1]);
return {};
}
return object;
}
}

View File

@ -21,44 +21,6 @@
namespace Aurora::IPC
{
#define PROXY_INTERNAL_INTERFACE(Base)\
virtual void OnPresleep() override \
{ \
Base.OnPresleep(); \
}; \
virtual bool OnTrigger(AuUInt handle) override \
{ \
return Base.OnTrigger(handle); \
} \
virtual void OnFinishSleep() override \
{ \
Base.OnFinishSleep(); \
} \
virtual bool Singular() override \
{ \
return Base.Singular(); \
} \
virtual AuUInt GetHandle() override \
{ \
return Base.GetHandle(); \
} \
virtual AuList<AuUInt> GetHandles() override \
{ \
return Base.GetHandles(); \
} \
virtual AuList<AuUInt> GetWriteHandles() override \
{ \
return Base.GetWriteHandles(); \
} \
virtual AuUInt GetWriteHandle() override \
{ \
return Base.GetWriteHandle(); \
} \
bool HasValidHandle() \
{ \
return Base.HasValidHandle(); \
}
#define IMPLEMENT_HANDLE \
IPC::IPCHandle handle_; \
AuString ExportToString() override \

View File

@ -169,7 +169,7 @@ namespace Aurora::Processes
auto ret = ReadFile(handle, destination.ptr, size, &size, NULL);
destination.outVariable = size;
return ret;
return ret || nonblock;
}
bool ProcessImpl::Write(const AuMemoryViewStreamRead &in)

View File

@ -165,10 +165,12 @@ namespace Aurora::Processes
if (tmp <= 0)
{
if (tmp != 0)
if (tmp == 0)
{
SysPushErrorMem("couldn't read error : %i\n", errno);
return nonblock;
}
SysPushErrorMem();
return false;
}