[+] Linux async process stream access API implementation and bug fixes in AuIOFS / Async.Linux.cpp implementation

This commit is contained in:
Reece Wilson 2022-05-04 20:34:46 +01:00
parent 9ac9b5419a
commit d5ec986e02
6 changed files with 176 additions and 17 deletions

View File

@ -19,12 +19,31 @@ namespace Aurora::IO::FS
virtual bool StartRead(AuUInt64 offset, const AuSPtr<Memory::MemoryViewWrite> &memoryView) = 0;
virtual bool StartWrite(AuUInt64 offset, const AuSPtr<Memory::MemoryViewRead> &memoryView) = 0;
/**
* @brief Non-blocking is-signaled and call callback poll routine
*/
virtual bool Complete() = 0;
/**
* @brief Returns the last packets length assuming ::Complete() is true or you are within the registered callback
*/
virtual AuUInt32 GetLastPacketLength() = 0;
/**
* @brief Registers an NT-like APC callback for the IO transaction.
* Can be executed under any Aurora loop subsystem sleep
*/
virtual void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &sub) = 0;
/**
* @brief Block for completion
*/
virtual bool Wait(AuUInt32 timeout) = 0;
/**
* @brief Provides a loop source that becomes signaled once the transaction is complete.
* Polling the transaction may result in the execution of the callback.
*/
virtual AuSPtr<Loop::ILoopSource> NewLoopSource() = 0;
};
}

View File

@ -41,6 +41,7 @@ namespace Aurora::Loop
// Process and console APIs
eProcessStdIn,
eProcessStdOut,
eProcessDead,
// window messge loops
eSourceApple,

View File

@ -24,6 +24,12 @@ namespace Aurora::Processes
* @return
*/
virtual AuSPtr<Threading::IWaitable> AsWaitable() = 0;
/**
* @brief Kernel synchronization primitive to synchronize against the process
*
*/
virtual AuSPtr<Loop::ILoopSource> AsLoopSource() = 0;
/**
* @brief returns the exit code of the process should we outlive them

View File

@ -190,9 +190,18 @@ namespace Aurora::IO::FS
}
this->latch_ = false;
this->bTxFinished_ = false;
this->lastFinishedStat_ = 0;
if (!this->loopSource_)
{
SysPushErrorUninitialized();
return false;
}
this->loopSource_->Reset();
this->lastAbstractOffset_ = offset;
this->lastFinishedStat_ = 0;
LIOS_Init(AuSharedFromThis());
SetMemory(memoryView);
@ -229,10 +238,19 @@ namespace Aurora::IO::FS
}
this->latch_ = false;
this->lastAbstractOffset_ = offset;
this->bTxFinished_ = false;
this->lastFinishedStat_ = 0;
if (!this->loopSource_)
{
SysPushErrorUninitialized();
return false;
}
this->loopSource_->Reset();
this->lastAbstractOffset_ = offset;
LIOS_Init(AuSharedFromThis());
SetMemory(memoryView);
@ -255,7 +273,7 @@ namespace Aurora::IO::FS
void LinuxAsyncFileTransaction::LIOS_Process(AuUInt32 read, bool failure, int err, bool mark)
{
this->lastFinishedStat_ = read;
this->lastFinishedStat_ = failure ? 0 : read;
this->bTxFinished_ = true;
if (mark)
{
@ -284,7 +302,8 @@ namespace Aurora::IO::FS
{
if (!this->latch_)
{
DispatchCb();
LIOS_SendProcess(this->lastFinishedStat_, this->lastFinishedStat_ == 0, 0, false);
//DispatchCb();
}
return true;
}

View File

@ -25,6 +25,21 @@ namespace Aurora::Processes
static AuThreadPrimitives::RWLockUnique_t gRWLock;
static AuHashMap<pid_t, ProcessImpl *> gPidLookupMap;
struct ProcessAliveLoopSource : Loop::LSEvent
{
ProcessAliveLoopSource();
virtual AuLoop::ELoopSource GetType() override;
};
ProcessAliveLoopSource::ProcessAliveLoopSource() : LSEvent(false, false, true)
{}
AuLoop::ELoopSource ProcessAliveLoopSource::GetType()
{
return AuLoop::ELoopSource::eProcessDead;
}
ProcessImpl::ProcessImpl(const StartupParmaters &params) : startup_(params)
{
AuIOFS::NormalizePath(this->startup_.process, this->startup_.process);
@ -84,16 +99,21 @@ namespace Aurora::Processes
{
this->finished_->Set();
}
if (this->loopSource_)
{
this->loopSource_->Set();
}
}
void ProcessImpl::ShutdownPipes()
{
if (auto fd = AuExchange(pipeStdErr_[0], {})) close(fd); // ME
//if (auto fd = AuExchange(pipeStdErr_[0], {})) close(fd); // ME
if (auto fd = AuExchange(pipeStdErr_[1], {})) close(fd);
if (auto fd = AuExchange(pipeStdOut_[0], {})) close(fd); // ME
//if (auto fd = AuExchange(pipeStdOut_[0], {})) close(fd); // ME
if (auto fd = AuExchange(pipeStdOut_[1], {})) close(fd);
if (auto fd = AuExchange(pipeStdIn_[0], {})) close(fd);
if (auto fd = AuExchange(pipeStdIn_[1], {})) close(fd); // ME
//if (auto fd = AuExchange(pipeStdIn_[1], {})) close(fd); // ME
}
bool ProcessImpl::TryKill()
@ -128,6 +148,11 @@ namespace Aurora::Processes
return this->finished_;
}
AuSPtr<Loop::ILoopSource> ProcessImpl::AsLoopSource()
{
return this->loopSource_;
}
AuSInt ProcessImpl::GetExitCode()
{
return this->exitCode_;
@ -189,7 +214,25 @@ namespace Aurora::Processes
return false;
}
return write(handle, source.ptr, source.length) == source.length;
auto control = ::fcntl(handle, F_GETFL);
auto ref = control;
if (/*nonblock*/ true)
{
control |= O_NONBLOCK;
}
else
{
control &= ~O_NONBLOCK;
}
if (ref != control)
{
::fcntl(handle, F_SETFL, control);
}
return ::write(handle, source.ptr, source.length) == source.length;
}
bool ProcessImpl::Init()
@ -221,18 +264,66 @@ namespace Aurora::Processes
}
}
this->loopSource_ = AuMakeShared<ProcessAliveLoopSource>();
if (!this->loopSource_)
{
return false;
}
this->finished_ = AuThreadPrimitives::EventShared(false, false, true);
return bool(this->finished_);
if (!this->finished_)
{
return false;
}
if (this->startup_.fwdIn || this->startup_.fwdOut)
{
this->fsHandle_ = AuMakeShared<IO::FS::FileHandle>();
if (!this->fsHandle_)
{
return false;
}
this->fsStream_ = AuMakeShared<ProcessPipeFileStream>();
if (!this->fsStream_)
{
return false;
}
this->fsHandle_->Init(this->pipeStdOut_[0], this->pipeStdIn_[1]);
this->fsStream_->Init(this->fsHandle_);
}
if (this->startup_.fwdErr)
{
this->fsErrorHandle_ = AuMakeShared<IO::FS::FileHandle>();
if (!this->fsErrorHandle_)
{
return false;
}
this->fsErrorStream_ = AuMakeShared<ProcessPipeFileStream>();
if (!this->fsErrorStream_)
{
return false;
}
this->fsErrorHandle_->Init(this->pipeStdErr_[0], -1);
this->fsErrorStream_->Init(this->fsErrorHandle_);
}
return true;
}
AuSPtr<AuIOFS::IAsyncTransaction> ProcessImpl::NewAsyncTransaction()
{
return {};
return this->fsStream_ ? this->fsStream_->NewTransaction() : AuSPtr<AuIOFS::IAsyncTransaction> {};
}
AuSPtr<AuIOFS::IAsyncTransaction> ProcessImpl::NewErrorStreamAsyncTransaction()
{
return {};
return this->fsErrorStream_ ? this->fsErrorStream_->NewTransaction() : AuSPtr<AuIOFS::IAsyncTransaction> {};
}
bool ProcessImpl::Start()
@ -241,7 +332,7 @@ namespace Aurora::Processes
if (this->type_ == ESpawnType::eSpawnOvermap)
{
execv(this->startup_.process.c_str(), (char *const *)this->cargs_.data()); // https://pubs.opengroup.org/onlinepubs/9699919799/functions/exec.html
::execv(this->startup_.process.c_str(), (char *const *)this->cargs_.data()); // https://pubs.opengroup.org/onlinepubs/9699919799/functions/exec.html
SysPushErrorGen("execv didn't overwrite the process map, given {} ({})", this->startup_.process, this->debug_);
return false;
}
@ -276,11 +367,12 @@ namespace Aurora::Processes
if (this->type_ != ESpawnType::eSpawnChildProcessWorker)
{
setsid();
::setsid();
}
execv(this->startup_.process.c_str(), (char * const *)this->cargs_.data()); // https://pubs.opengroup.org/onlinepubs/9699919799/functions/exec.html
SysPushErrorGen("execv didn't overwrite the process map, given {} ({})", this->startup_.process, this->debug_);
::execv(this->startup_.process.c_str(), (char * const *)this->cargs_.data()); // https://pubs.opengroup.org/onlinepubs/9699919799/functions/exec.html
SysPushErrorGen("execv didn't overwrite the process map. Launch: {} ({})", this->startup_.process, this->debug_);
return false;
}
else if (pid > 0)

View File

@ -6,6 +6,19 @@
Date: 2021-6-12
Author: Reece
***/
#pragma once
#include <Source/Loop/LSEvent.hpp>
#include <Source/IO/FS/FS.hpp>
#if defined(AURORA_IS_LINUX_DERIVED)
#include <Source/IO/UNIX/IOSubmit.Linux.hpp>
#include <Source/IO/FS/Async.Linux.hpp>
using ProcessPipeFileStream = Aurora::IO::FS::LinuxAsyncFileStream;
#else
#include <Source/IO/FS/Async.Unix.hpp>
using ProcessPipeFileStream = Aurora::IO::FS::UnixAsyncFileStream;
#endif
namespace Aurora::Processes
{
@ -22,7 +35,8 @@ namespace Aurora::Processes
bool TryKill() override;
bool Terminate() override;
AuSPtr<Aurora::Threading::IWaitable> AsWaitable() override;
AuSPtr<Threading::IWaitable> AsWaitable() override;
AuSPtr<Loop::ILoopSource> AsLoopSource() override;
AuSInt GetExitCode() override;
@ -45,6 +59,14 @@ namespace Aurora::Processes
int pipeStdErr_[2]{};
int pipeStdIn_ [2]{};
AuSPtr<Loop::ILSEvent> loopSource_;
AuSPtr<IO::FS::FileHandle> fsHandle_;
AuSPtr<ProcessPipeFileStream> fsStream_;
AuSPtr<IO::FS::FileHandle> fsErrorHandle_;
AuSPtr<ProcessPipeFileStream> fsErrorStream_;
AuThreadPrimitives::EventShared_t finished_;
StartupParmaters startup_;