/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: Async.Linux.cpp Date: 2022-4-12 Author: Reece ***/ #include #include "FS.hpp" #include "FileAdvisory.Unix.hpp" #include #include #include #include #include #include #include "FileStream.Unix.hpp" #include "Async.Linux.hpp" #include #include namespace Aurora::IO::FS { #define IPC_PIPE AuStaticCast(this->pHandle_)->pIPCPipe struct LinuxAsyncFileTransactionLoopSource : Aurora::IO::Loop::LSEvent { LinuxAsyncFileTransactionLoopSource(AuSPtr that); virtual bool IsSignaled() override; virtual bool OnTrigger(AuUInt handle) override; virtual AuLoop::ELoopSource GetType() override; virtual const AuList &GetHandles() override; virtual bool Singular() override; private: bool bExMode {}; AuWPtr caller_; AuList handles_; }; LinuxAsyncFileTransactionLoopSource::LinuxAsyncFileTransactionLoopSource(AuSPtr that) : caller_(that), Loop::LSEvent(false, false, true) { if (that) { if (auto pPipe = AuStaticCast(that->GetFileHandle())->pIPCPipe) { this->bExMode = true; this->handles_ = {pPipe->GetPreemptFd(), Loop::LSEvent::GetHandle()}; } } } const AuList &LinuxAsyncFileTransactionLoopSource::GetHandles() { return this->handles_; } bool LinuxAsyncFileTransactionLoopSource::Singular() { return !this->bExMode; } bool LinuxAsyncFileTransactionLoopSource::OnTrigger(AuUInt handle) { auto lock = caller_.lock(); if (lock) { return lock->Complete(); } return true; } bool LinuxAsyncFileTransactionLoopSource::IsSignaled() { if (LSEvent::IsSignaled()) { auto lock = caller_.lock(); return lock->Complete(); } return false; } Loop::ELoopSource LinuxAsyncFileTransactionLoopSource::GetType() { return Loop::ELoopSource::eSourceAIO; } LinuxAsyncFileTransaction::~LinuxAsyncFileTransaction() { } void LinuxAsyncFileStream::Init(const AuSPtr &handle) { this->pHandle_ = handle; } AuSPtr LinuxAsyncFileStream::NewTransaction() { auto shared = AuMakeShared(); if (!shared) { return {}; } if (!shared->Init(this->pHandle_)) { return {}; } return shared; } bool LinuxAsyncFileStream::BlockingTruncate(AuUInt64 length) { auto iOptSafe = this->pHandle_->GetOSWriteHandleSafe(); if (!iOptSafe) { return false; } auto fd = (int)iOptSafe.Value(); if (fd == -1) { SysPushErrorUninitialized(); return false; } return ::ftruncate(fd, length) != -1; } bool LinuxAsyncFileStream::BlockingRead(AuUInt64 offset, const Memory::MemoryViewStreamWrite ¶meters) { auto iOptSafe = this->pHandle_->GetOSReadHandleSafe(); if (!iOptSafe) { return false; } auto fd = (int)iOptSafe.Value(); if (fd == -1) { SysPushErrorUninitialized(); return false; } if (IPC_PIPE) { if (IPC_PIPE->LIOS_PopOne()) { parameters.outVariable = 0; return true; } } if (!PosixSetOffset(fd, offset)) { return false; } AuUInt32 read; if (!PosixRead(fd, parameters.ptr, parameters.length, &read)) { return false; } return true; } bool LinuxAsyncFileStream::BlockingWrite(AuUInt64 offset, const Memory::MemoryViewStreamRead ¶meters) { auto iOptSafe = this->pHandle_->GetOSWriteHandleSafe(); if (!iOptSafe) { return false; } auto fd = (int)iOptSafe.Value(); if (fd == -1) { SysPushErrorUninitialized(); return false; } if (!PosixSetOffset(fd, offset)) { return false; } AuUInt32 read; if (!PosixWrite(fd, parameters.ptr, parameters.length, &read)) { return false; } return true; } bool LinuxAsyncFileTransaction::Init(const AuSPtr &handle) { this->pHandle_ = handle; this->loopSource_ = AuMakeShared(AuSharedFromThis()); return bool(this->loopSource_); } void LinuxAsyncFileTransaction::SetBaseOffset(AuUInt64 uBaseOffset) { this->uBaseOffset = uBaseOffset; } bool LinuxAsyncFileTransaction::StartRead(AuUInt64 offset, const AuSPtr &memoryView) { if (HasState()) { SysPushErrorIO("IO Operation can not be reused yet."); return false; } auto iOptSafe = this->pHandle_->GetOSReadHandleSafe(); if (!iOptSafe) { return false; } auto fd = (int)iOptSafe.Value(); if (fd == -1) { SysPushErrorUninitialized(); return false; } this->latch_ = false; this->hasError_ = false; this->bTxFinished_ = false; this->lastFinishedStat_ = 0; if (!this->loopSource_) { SysPushErrorUninitialized(); return false; } this->loopSource_->Reset(); this->lastAbstractOffset_ = offset; LIOS_Init(AuSharedFromThis()); SetMemory(memoryView); if (IPC_PIPE) { if (IPC_PIPE->LIOS_PopOne()) { LIOS_SendProcess(0, false, errno); return true; } } offset += this->uBaseOffset; if (!UNIX::LinuxOverlappedSubmitRead(fd, offset, this, this->loopSource_.get(), bool(IPC_PIPE))) { LIOS_SendProcess(0, true, errno); return true; } else { if (gRuntimeConfig.linuxConfig.bFIODisableBatching) { UNIX::SendIOBuffers(); } return true; } } bool LinuxAsyncFileTransaction::StartWrite(AuUInt64 offset, const AuSPtr &memoryView) { if (HasState()) { SysPushErrorIO("IO Operation can not be reused yet."); return false; } auto iOptSafe = this->pHandle_->GetOSWriteHandleSafe(); if (!iOptSafe) { return false; } auto fd = (int)iOptSafe.Value(); if (fd == -1) { SysPushErrorUninitialized(); return false; } this->latch_ = false; this->bTxFinished_ = false; this->hasError_ = false; this->lastFinishedStat_ = 0; if (!this->loopSource_) { SysPushErrorUninitialized(); return false; } this->loopSource_->Reset(); this->lastAbstractOffset_ = offset; LIOS_Init(AuSharedFromThis()); SetMemory(memoryView); offset += this->uBaseOffset; if (!UNIX::LinuxOverlappedSubmitWrite(fd, offset, this, this->loopSource_.get())) { LIOS_SendProcess(0, true, errno); return false; } else { if (gRuntimeConfig.linuxConfig.bFIODisableBatching) { UNIX::SendIOBuffers(); } return true; } } void LinuxAsyncFileTransaction::Reset() { if (this->loopSource_) { this->loopSource_->Reset(); } } void LinuxAsyncFileTransaction::LIOS_Process(AuUInt32 read, bool failure, int err, bool mark) { this->lastFinishedStat_ = failure ? 0 : read; this->hasError_ = failure; this->error_ = err; this->bTxFinished_ = true; if (mark) { return; } this->DispatchCb(); if (read) { if (IPC_PIPE) { // Return value intentionally ignored // We just need to poke on read... IPC_PIPE->LIOS_PopOne(); } } } void LinuxAsyncFileTransaction::DispatchCb() { if (AuExchange(this->latch_, true)) { // TODO (Reece): urgent //SysPushErrorGeneric(); return; } if (this->sub_) { this->sub_->OnAsyncFileOpFinished(this->lastAbstractOffset_, this->lastFinishedStat_); } } bool LinuxAsyncFileTransaction::Complete() { if (this->bTxFinished_) { if (!this->latch_) { LIOS_SendProcess(this->lastFinishedStat_, this->lastFinishedStat_ == 0, 0, false); //DispatchCb(); } return true; } return false; } bool LinuxAsyncFileTransaction::HasCompleted() { return this->bTxFinished_; } bool LinuxAsyncFileTransaction::Failed() { return this->hasError_; } AuUInt LinuxAsyncFileTransaction::GetOSErrorCode() { return AuUInt(this->error_); } AuUInt32 LinuxAsyncFileTransaction::GetLastPacketLength() { return this->lastFinishedStat_; } void LinuxAsyncFileTransaction::SetCallback(const AuSPtr &sub) { this->sub_ = sub; } bool LinuxAsyncFileTransaction::Wait(AuUInt32 timeout) { // TODO: AuList> files {AuUnsafeRaiiToShared(this)}; return WaitMultiple(files, timeout); } AuSPtr LinuxAsyncFileTransaction::GetFileHandle() { return this->pHandle_; } AuSPtr LinuxAsyncFileTransaction::NewLoopSource() { return AuStaticCast(AuStaticCast(this->loopSource_)); } AUKN_SYM IAsyncFileStream *OpenAsyncNew(const AuString &path, EFileOpenMode openMode, bool bDirectIO, EFileAdvisoryLockLevel lock) { auto pHandle = AuIO::IOHandleShared(); if (!pHandle) { SysPushErrorMemory(); return nullptr; } AuIO::IIOHandle::HandleCreate createhandle(path); createhandle.eAdvisoryLevel = lock; createhandle.eMode = openMode; createhandle.bFailIfNonEmptyFile = false; createhandle.bDirectIOMode = bDirectIO; createhandle.bAsyncHandle = true; if (!pHandle->InitFromPath(createhandle)) { SysPushErrorNested(); return nullptr; } auto pStream = _new LinuxAsyncFileStream(); if (!pStream) { SysPushErrorMemory(); return nullptr; } pStream->Init(pHandle); return pStream; } AUKN_SYM void OpenAsyncRelease(IAsyncFileStream *handle) { AuSafeDelete(handle); } }