AuroraRuntime/Source/IO/FS/Async.Linux.cpp

435 lines
11 KiB
C++
Raw Normal View History

2022-04-13 11:00:35 +00:00
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Async.Linux.cpp
Date: 2022-4-12
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "FS.hpp"
#include "Async.Linux.hpp"
#include "FileAdvisory.Unix.hpp"
#include <Source/Loop/Loop.hpp>
#include <Source/Loop/LSHandle.hpp>
#include <Source/Loop/LSEvent.hpp>
#include <Source/IO/UNIX/IOSubmit.Linux.hpp>
#include <unistd.h>
#include <fcntl.h>
namespace Aurora::IO::FS
{
struct LinuxAsyncFileTransaction;
struct LinuxAsyncFileTransactionLoopSource : Loop::LSEvent
{
LinuxAsyncFileTransactionLoopSource(AuSPtr<LinuxAsyncFileTransaction> that);
virtual bool IsSignaled() override;
virtual bool OnTrigger(AuUInt handle) override;
virtual Loop::ELoopSource GetType() override;
private:
AuWPtr<LinuxAsyncFileTransaction> caller_;
};
struct FileHandle
{
~FileHandle();
bool Init(const AuString &path, EFileOpenMode openMode, bool directIO, EFileAdvisoryLockLevel lock);
int handle {-1};
AuString path;
bool readOnly;
bool directIO;
};
struct LinuxAsyncFileStream : public IAsyncFileStream
{
AuSPtr<IAsyncTransaction> NewTransaction() override;
void Init(const AuSPtr<FileHandle> &handle);
AuSPtr<FileHandle> GetHandle();
private:
AuSPtr<FileHandle> handle_;
};
struct LinuxAsyncFileTransaction : IAsyncTransaction, AuEnableSharedFromThis<LinuxAsyncFileTransaction>, Aurora::IO::UNIX::ASubmittable
{
~LinuxAsyncFileTransaction();
bool Init(const AuSPtr<FileHandle> &handle);
bool StartRead(AuUInt64 offset, const AuSPtr<AuMemoryViewWrite> &memoryView) override;
bool StartWrite(AuUInt64 offset, const AuSPtr<AuMemoryViewRead> &memoryView) override;
bool Complete() override;
AuUInt32 GetLastPacketLength() override;
void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &sub) override;
bool Wait(AuUInt32 timeout) override;
AuSPtr<Loop::ILoopSource> NewLoopSource() override;
void DispatchCb();
AuSPtr<FileHandle> GetFileHandle();
virtual void LIOS_Process(AuUInt32 read, bool failure, int err, bool mark) override;
private:
AuSPtr<FileHandle> handle_;
AuUInt32 lastAbstractOffset_ {}, lastFinishedStat_{};
bool latch_ {};
bool bTxFinished_ {};
AuSPtr<IAsyncFinishedSubscriber> sub_;
AuSPtr<LinuxAsyncFileTransactionLoopSource> loopSource_;
};
LinuxAsyncFileTransactionLoopSource::LinuxAsyncFileTransactionLoopSource(AuSPtr<LinuxAsyncFileTransaction> that) : caller_(that), Loop::LSEvent(false, false, true)
{
}
bool LinuxAsyncFileTransactionLoopSource::OnTrigger(AuUInt handle)
{
return IsSignaled();
}
bool LinuxAsyncFileTransactionLoopSource::IsSignaled()
{
auto lock = caller_.lock();
if (!lock) return LSEvent::IsSignaled();
return lock->Complete();
}
Loop::ELoopSource LinuxAsyncFileTransactionLoopSource::GetType()
{
return Loop::ELoopSource::eSourceAIO;
}
LinuxAsyncFileTransaction::~LinuxAsyncFileTransaction()
{
}
FileHandle::~FileHandle()
{
if ((this->handle != 0) &&
(this->handle != -1))
{
close(this->handle);
}
}
bool FileHandle::Init(const AuString &path, EFileOpenMode openMode, bool directIO, EFileAdvisoryLockLevel lock)
{
int fileHandle;
auto pathex = NormalizePathRet(path);
if (pathex.empty())
{
return false;
}
fileHandle = ::open(pathex.c_str(),
openMode == EFileOpenMode::eRead ? O_RDONLY : (O_RDWR | O_CREAT),
0664);
if (fileHandle == -1)
{
SysPushErrorIO("Couldn't open file: {} ({}) {}", path, pathex, errno);
return false;
}
if (!ApplyDumbAdvisoryLock(fileHandle, lock))
{
SysPushErrorIO("Couldn't open file: {}. File node (not section) is locked.", path);
return false;
}
2022-04-13 11:00:35 +00:00
this->directIO = directIO;
this->handle = fileHandle;
this->readOnly = openMode == EFileOpenMode::eRead;
return true;
}
AuSPtr<FileHandle> LinuxAsyncFileStream::GetHandle()
{
return handle_;
}
void LinuxAsyncFileStream::Init(const AuSPtr<FileHandle> &handle)
{
this->handle_ = handle;
}
AuSPtr<IAsyncTransaction> LinuxAsyncFileStream::NewTransaction()
{
auto shared = AuMakeShared<LinuxAsyncFileTransaction>();
if (!shared)
{
return {};
}
if (!shared->Init(this->handle_))
{
return {};
}
return shared;
}
bool LinuxAsyncFileTransaction::Init(const AuSPtr<FileHandle> &handle)
{
this->handle_ = handle;
this->loopSource_ = AuMakeShared<LinuxAsyncFileTransactionLoopSource>(AuSharedFromThis());
return bool(this->loopSource_);
}
bool LinuxAsyncFileTransaction::StartRead(AuUInt64 offset, const AuSPtr<AuMemoryViewWrite> &memoryView)
{
if (HasState())
{
SysPushErrorIO("IO Operation can not be reused yet.");
return false;
}
auto fd = this->handle_->handle;
if (fd == -1)
{
SysPushErrorUninitialized();
return false;
}
this->latch_ = false;
this->lastAbstractOffset_ = offset;
this->lastFinishedStat_ = 0;
LIOS_Init(AuSharedFromThis());
SetMemory(memoryView);
if (!UNIX::LinuxOverlappedSubmitRead(fd, offset, this, this->loopSource_.get()))
{
LIOS_SendProcess(0, true, errno);
return false;
}
else
{
if (gRuntimeConfig.bFIODisableBatching)
{
UNIX::SendIOBuffers();
}
return true;
}
}
bool LinuxAsyncFileTransaction::StartWrite(AuUInt64 offset, const AuSPtr<AuMemoryViewRead> &memoryView)
{
if (HasState())
{
SysPushErrorIO("IO Operation can not be reused yet.");
return false;
}
auto fd = this->handle_->handle;
if (fd == -1)
{
SysPushErrorUninitialized();
return false;
}
this->latch_ = false;
this->lastAbstractOffset_ = offset;
this->lastFinishedStat_ = 0;
LIOS_Init(AuSharedFromThis());
SetMemory(memoryView);
if (!UNIX::LinuxOverlappedSubmitWrite(fd, offset, this, this->loopSource_.get()))
{
LIOS_SendProcess(0, true, errno);
return false;
}
else
{
if (gRuntimeConfig.bFIODisableBatching)
{
UNIX::SendIOBuffers();
}
return true;
}
}
void LinuxAsyncFileTransaction::LIOS_Process(AuUInt32 read, bool failure, int err, bool mark)
{
this->lastFinishedStat_ = read;
this->bTxFinished_ = true;
if (mark)
{
return;
}
this->DispatchCb();
}
void LinuxAsyncFileTransaction::DispatchCb()
{
if (AuExchange(this->latch_, true))
{
SysPushErrorGeneric();
return;
}
if (this->sub_)
{
this->sub_->OnAsyncFileOpFinished(this->lastAbstractOffset_, this->lastFinishedStat_);
}
}
bool LinuxAsyncFileTransaction::Complete()
{
if (this->bTxFinished_)
{
if (!this->latch_)
{
DispatchCb();
}
return true;
}
return false;
}
AuUInt32 LinuxAsyncFileTransaction::GetLastPacketLength()
{
return this->lastFinishedStat_;
}
void LinuxAsyncFileTransaction::SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &sub)
{
this->sub_ = sub;
}
bool LinuxAsyncFileTransaction::Wait(AuUInt32 timeout)
{
// TODO:
AuList<AuSPtr<IAsyncTransaction>> files {AuUnsafeRaiiToShared(this)};
return WaitMultiple(files, timeout);
}
AuSPtr<FileHandle> LinuxAsyncFileTransaction::GetFileHandle()
{
return this->handle_;
}
AuSPtr<Loop::ILoopSource> LinuxAsyncFileTransaction::NewLoopSource()
{
return AuStaticCast<Loop::ILoopSource>(AuStaticCast<Loop::ILSEvent>(this->loopSource_));
}
AUKN_SYM IAsyncFileStream *OpenAsyncNew(const AuString &path, EFileOpenMode openMode, bool directIO, EFileAdvisoryLockLevel lock)
{
AuSPtr<FileHandle> fileHandle;
LinuxAsyncFileStream *stream;
if (path.empty())
{
SysPushErrorParam("Empty path");
return {};
}
if (!EFileOpenModeIsValid(openMode))
{
SysPushErrorParam("Invalid open mode");
return {};
}
fileHandle = AuMakeShared<FileHandle>();
if (!fileHandle->Init(path, openMode, directIO, lock))
{
SysPushErrorNested();
return {};
}
stream = _new LinuxAsyncFileStream();
if (!stream)
{
SysPushErrorMem();
return {};
}
stream->Init(fileHandle);
return stream;
}
AUKN_SYM void OpenAsyncRelease(IAsyncFileStream *handle)
{
AuSafeDelete<LinuxAsyncFileStream *>(handle);
}
AUKN_SYM AuUInt32 WaitMultiple(const AuList<AuSPtr<IAsyncTransaction>> &files, AuUInt32 timeout)
{
AuCtorCode_t code;
AuUInt32 count {};
AuUInt64 endTime = AuTime::CurrentInternalClockMS() + AuUInt64(timeout);
auto waitQueue = AuTryConstruct<AuList<AuSPtr<IAsyncTransaction>>>(code, files);
if (!code)
{
return 0;
}
while (true)
{
if (waitQueue.empty())
{
break;
}
AuUInt32 timeoutMS {};
if (timeout)
{
AuInt64 delta = (AuInt64)endTime - AuTime::CurrentInternalClockMS();
if (delta <= 0)
{
break;
}
timeoutMS = delta;
}
if (!UNIX::LinuxOverlappedPoll(timeoutMS))
{
continue;
}
for (auto itr = waitQueue.begin();
itr != waitQueue.end();)
{
if (AuStaticPointerCast<LinuxAsyncFileTransaction>(*itr)->Complete())
{
count ++;
itr = waitQueue.erase(itr);
}
else
{
itr++;
}
}
if (count)
{
break;
}
}
return count;
}
}