[*] Update UNIX pipe to follow NT-like bidirectional semantics. It's not possible to half NT, but it's possible to double the pipes in UNIX land.
This commit is contained in:
parent
1f15674016
commit
06e4411511
@ -34,7 +34,7 @@ namespace Aurora::IPC
|
||||
AuWPtr<IPCPipeImpl> parent_;
|
||||
};
|
||||
|
||||
struct IPCPipeImpl : IPCPipe, Loop::LSHandle, public AuEnableSharedFromThis<IPCPipeImpl>
|
||||
struct IPCPipeImpl : IPCPipe, Loop::LSHandle, AuEnableSharedFromThis<IPCPipeImpl>
|
||||
{
|
||||
IPCPipeImpl(HANDLE clientHandle, HANDLE serverHandle, const IPCHandle &handle);
|
||||
~IPCPipeImpl();
|
||||
|
@ -22,15 +22,20 @@ namespace Aurora::IPC
|
||||
// Pipes
|
||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
struct IPCPipeImpl : IPCPipe, Loop::LSHandle
|
||||
struct IPCPipeImpl : IPCPipe, Loop::LSHandle, AuEnableSharedFromThis<IPCPipeImpl>
|
||||
{
|
||||
IPCPipeImpl(int (fds)[2], IPCHandle readEnd, IPCHandle writeEnd);
|
||||
IPCPipeImpl(int (fds)[2], int (secondary)[2], IPCHandle readEnd, IPCHandle writeEnd, AuSPtr<IPCEvent> event, AuSPtr<IPCMutex> mutex);
|
||||
~IPCPipeImpl();
|
||||
|
||||
PROXY_INTERNAL_INTERFACE_(LSHandle::)
|
||||
|
||||
virtual AuSPtr<IO::FS::IAsyncTransaction> NewAsyncTransaction() override;
|
||||
|
||||
virtual AuSPtr<Loop::ILoopSource> AsReadChannelIsOpen() override;
|
||||
virtual AuSPtr<Loop::ILoopSource> AsReadChannelHasData() override;
|
||||
|
||||
virtual bool Read(const Memory::MemoryViewStreamWrite &write, bool nonblock) override;
|
||||
virtual bool Write(const Memory::MemoryViewStreamRead &read, bool nonblock) override;
|
||||
virtual bool Write(const Memory::MemoryViewStreamRead &read) override;
|
||||
virtual AuString ExportToString() override;
|
||||
|
||||
bool IsSignaled() override;
|
||||
@ -39,13 +44,17 @@ namespace Aurora::IPC
|
||||
|
||||
private:
|
||||
int fds[2] {-1, -1};
|
||||
//Loop::LSHandle lsHandle_;
|
||||
int secondary[2] {-1, -1};
|
||||
|
||||
IPCHandle readEnd_;
|
||||
IPCHandle writeEnd_;
|
||||
AuSPtr<IPCEvent> event_;
|
||||
AuSPtr<IPCMutex> mutex_;
|
||||
};
|
||||
|
||||
IPCPipeImpl::IPCPipeImpl(int (fds2)[2], IPCHandle readEnd, IPCHandle writeEnd) :
|
||||
fds {fds2[0], fds2[1]}, readEnd_(readEnd), writeEnd_(writeEnd)
|
||||
IPCPipeImpl::IPCPipeImpl(int (fds2)[2], int (fds3)[2], IPCHandle readEnd, IPCHandle writeEnd, AuSPtr<IPCEvent> event, AuSPtr<IPCMutex> mutex) :
|
||||
fds {fds2[0], fds2[1]}, secondary {fds3[0], fds3[1]},
|
||||
readEnd_(readEnd), writeEnd_(writeEnd), event_(event), mutex_(mutex)
|
||||
{
|
||||
this->handle = fds[0];
|
||||
}
|
||||
@ -65,8 +74,38 @@ namespace Aurora::IPC
|
||||
IO::UNIX::FDServeEnd(writeEnd_);
|
||||
::close(fd);
|
||||
}
|
||||
|
||||
if ((fd = AuExchange(secondary[0], -1)) != -1)
|
||||
{
|
||||
::close(fd);
|
||||
}
|
||||
|
||||
if ((fd = AuExchange(secondary[1], -1)) != -1)
|
||||
{
|
||||
::close(fd);
|
||||
}
|
||||
|
||||
if (secondary[0] == -1)
|
||||
{
|
||||
event_->Reset();
|
||||
}
|
||||
}
|
||||
|
||||
AuSPtr<Loop::ILoopSource> IPCPipeImpl::AsReadChannelIsOpen()
|
||||
{
|
||||
return this->event_;
|
||||
}
|
||||
|
||||
AuSPtr<Loop::ILoopSource> IPCPipeImpl::AsReadChannelHasData()
|
||||
{
|
||||
return AuSharedFromThis();
|
||||
}
|
||||
|
||||
AuSPtr<IO::FS::IAsyncTransaction> IPCPipeImpl::NewAsyncTransaction()
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
bool IPCPipeImpl::Read(const Memory::MemoryViewStreamWrite &write, bool nonblock)
|
||||
{
|
||||
auto handle = fds[0];
|
||||
@ -108,8 +147,9 @@ namespace Aurora::IPC
|
||||
return true;
|
||||
}
|
||||
|
||||
bool IPCPipeImpl::Write(const Memory::MemoryViewStreamRead &read, bool nonblock)
|
||||
bool IPCPipeImpl::Write(const Memory::MemoryViewStreamRead &read)//, bool nonblock)
|
||||
{
|
||||
bool nonblock = false;
|
||||
auto handle = this->fds[1];
|
||||
auto control = ::fcntl(handle, F_GETFL);
|
||||
auto ref = control;
|
||||
@ -166,45 +206,80 @@ namespace Aurora::IPC
|
||||
|
||||
AuString IPCPipeImpl::ExportToString()
|
||||
{
|
||||
return this->readEnd_.ToString() + "." + this->writeEnd_.ToString();
|
||||
return this->readEnd_.ToString() + "." +
|
||||
this->writeEnd_.ToString() + "." +
|
||||
this->event_->ExportToString() + "." +
|
||||
this->mutex_->ExportToString();
|
||||
}
|
||||
|
||||
AUKN_SYM AuSPtr<IPCPipe> NewPipe()
|
||||
{
|
||||
IPCHandle readEnd, writeEnd;
|
||||
int fds[2];
|
||||
int fds1[2];
|
||||
int fds2[2];
|
||||
|
||||
if (::pipe(fds) == -1)
|
||||
auto event = NewEvent(false, false);
|
||||
if (!event)
|
||||
{
|
||||
SysPushErrorNested();
|
||||
return {};
|
||||
}
|
||||
|
||||
auto mutex = NewMutex();
|
||||
if (!mutex)
|
||||
{
|
||||
SysPushErrorNested();
|
||||
return {};
|
||||
}
|
||||
|
||||
if (::pipe(fds1) == -1)
|
||||
{
|
||||
SysPushErrorIO();
|
||||
return {};
|
||||
}
|
||||
|
||||
if (!IO::UNIX::FDServe(true, true, true, true, fds[0], readEnd))
|
||||
if (::pipe(fds2) == -1)
|
||||
{
|
||||
SysPushErrorIO();
|
||||
::close(fds[0]);
|
||||
::close(fds[1]);
|
||||
::close(fds1[0]);
|
||||
::close(fds1[1]);
|
||||
return {};
|
||||
}
|
||||
|
||||
if (!IO::UNIX::FDServe(true, true, true, true, fds[1], writeEnd))
|
||||
if (!IO::UNIX::FDServe(true, true, true, true, fds1[0], readEnd))
|
||||
{
|
||||
SysPushErrorIO();
|
||||
::close(fds1[0]);
|
||||
::close(fds1[1]);
|
||||
::close(fds2[0]);
|
||||
::close(fds2[1]);
|
||||
return {};
|
||||
}
|
||||
|
||||
if (!IO::UNIX::FDServe(true, true, true, true, fds2[1], writeEnd))
|
||||
{
|
||||
SysPushErrorIO();
|
||||
IO::UNIX::FDServeEnd(readEnd);
|
||||
::close(fds[0]);
|
||||
::close(fds[1]);
|
||||
::close(fds1[0]);
|
||||
::close(fds1[1]);
|
||||
::close(fds2[0]);
|
||||
::close(fds2[1]);
|
||||
return {};
|
||||
}
|
||||
|
||||
auto handle = AuMakeShared<IPCPipeImpl>(fds, readEnd, writeEnd);
|
||||
int serverPair[2] {fds2[0], fds1[1]};
|
||||
int clientPair[2] {fds1[0], fds2[1]};
|
||||
|
||||
auto handle = AuMakeShared<IPCPipeImpl>(serverPair, clientPair, readEnd, writeEnd, event, mutex);
|
||||
if (!handle)
|
||||
{
|
||||
SysPushErrorMem();
|
||||
IO::UNIX::FDServeEnd(readEnd);
|
||||
IO::UNIX::FDServeEnd(writeEnd);
|
||||
::close(fds[0]);
|
||||
::close(fds[1]);
|
||||
::close(fds1[0]);
|
||||
::close(fds1[1]);
|
||||
::close(fds2[0]);
|
||||
::close(fds2[1]);
|
||||
return {};
|
||||
}
|
||||
|
||||
@ -222,8 +297,22 @@ namespace Aurora::IPC
|
||||
return {};
|
||||
}
|
||||
|
||||
auto itr2 = handle.find_first_of('.', itr + 1);
|
||||
if (itr2 == AuString::npos)
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
auto itr3 = handle.find_first_of('.', itr2 + 1);
|
||||
if (itr3 == AuString::npos)
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
auto readString = handle.substr(0, itr);
|
||||
auto writeString = handle.substr(itr + 1);
|
||||
auto writeString = handle.substr(itr + 1, itr2 - itr - 1);
|
||||
auto eventString = handle.substr(itr2 + 1, itr3 - itr2 - 1);
|
||||
auto mutexString = handle.substr(itr3 + 1);
|
||||
|
||||
if (!readEnd.FromString(readString))
|
||||
{
|
||||
@ -237,6 +326,18 @@ namespace Aurora::IPC
|
||||
return {};
|
||||
}
|
||||
|
||||
auto event = ImportEvent(eventString);
|
||||
if (!event)
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
auto mutex = ImportMutex(mutexString);
|
||||
if (!mutex)
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
if (!IO::UNIX::FDAccept(readEnd, fds[0]))
|
||||
{
|
||||
SysPushErrorNested();
|
||||
@ -267,7 +368,8 @@ namespace Aurora::IPC
|
||||
return {};
|
||||
}
|
||||
|
||||
auto object = AuMakeShared<IPCPipeImpl>(fds, readEnd, writeEnd);
|
||||
int dummy[2] {-1, -1};
|
||||
auto object = AuMakeShared<IPCPipeImpl>(fds, dummy, readEnd, writeEnd, event, mutex);
|
||||
if (!object)
|
||||
{
|
||||
SysPushErrorMem();
|
||||
@ -278,6 +380,23 @@ namespace Aurora::IPC
|
||||
return {};
|
||||
}
|
||||
|
||||
mutex->WaitOn();
|
||||
|
||||
if (event->IsSignaled())
|
||||
{
|
||||
mutex->Unlock();
|
||||
SysPushErrorIO("Pipe Busy");
|
||||
IO::UNIX::FDServeEnd(readEnd);
|
||||
IO::UNIX::FDServeEnd(writeEnd);
|
||||
::close(fds[0]);
|
||||
::close(fds[1]);
|
||||
return {};
|
||||
}
|
||||
|
||||
event->Set();
|
||||
|
||||
mutex->Unlock();
|
||||
|
||||
return object;
|
||||
}
|
||||
}
|
@ -5,7 +5,6 @@
|
||||
Date: 2021-6-12
|
||||
Author: Reece
|
||||
***/
|
||||
#pragma once
|
||||
#include <RuntimeInternal.hpp>
|
||||
#include "Processes.hpp"
|
||||
#include "Open.Unix.hpp"
|
||||
|
Loading…
Reference in New Issue
Block a user