AuroraRuntime/Source/IO/FS/Async.NT.cpp

571 lines
15 KiB
C++

/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Async.NT.cpp
Date: 2021-9-13
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "FS.hpp"
#include <Source/IO/Loop/LSHandle.hpp>
//#include "IPCHandle.hpp"
#include "Async.NT.hpp"
#include "FileAdvisory.NT.hpp"
#include <Source/IO/Loop/Loop.hpp>
#include <Source/IO/IPC/IPCPipe.NT.hpp>
namespace Aurora::IO::FS
{
struct NtAsyncFileTransactionLoopSource : AuLoop::LSHandle
{
NtAsyncFileTransactionLoopSource(AuSPtr<NtAsyncFileTransaction> that) : caller_(that), Loop::LSHandle(AuUInt(that->GetFileHandle()->handle))
{}
virtual bool IsSignaled() override;
virtual bool OnTrigger(AuUInt handle) override;
virtual AuLoop::ELoopSource GetType() override;
private:
AuWPtr<NtAsyncFileTransaction> caller_;
};
bool NtAsyncFileTransactionLoopSource::OnTrigger(AuUInt handle)
{
auto lock = caller_.lock();
if (lock)
{
return lock->Complete();
}
return true;
}
Loop::ELoopSource NtAsyncFileTransactionLoopSource::GetType()
{
return Loop::ELoopSource::eSourceAIO;
}
bool NtAsyncFileTransactionLoopSource::IsSignaled()
{
auto lock = caller_.lock();
if (lock)
{
return lock->Complete();
}
return LSHandle::IsSignaled();
}
NtAsyncFileTransaction::~NtAsyncFileTransaction()
{
AuWin32CloseHandle(this->event_);
}
FileHandle::~FileHandle()
{
if (this->writeHandle != this->handle)
{
AuWin32CloseHandle(this->writeHandle);
}
this->writeHandle = INVALID_HANDLE_VALUE;
AuWin32CloseHandle(this->handle);
}
bool FileHandle::Init(const AuString &path, EFileOpenMode openMode, bool directIO, EFileAdvisoryLockLevel lock)
{
HANDLE fileHandle;
auto pathex = NormalizePathRet(path);
if (pathex.empty())
{
return false;
}
auto win32Path = Locale::ConvertFromUTF8(pathex);
if (win32Path.empty())
{
return false;
}
auto flags = FILE_FLAG_OVERLAPPED;
if (directIO)
{
flags |= FILE_FLAG_NO_BUFFERING;
}
fileHandle = INVALID_HANDLE_VALUE;
switch (openMode)
{
case EFileOpenMode::eRead:
{
fileHandle = CreateFileW(win32Path.c_str(), GENERIC_READ, NtLockAdvisoryToShare(lock), NULL, OPEN_EXISTING, flags, NULL);
break;
}
case EFileOpenMode::eReadWrite:
{
CreateDirectories(pathex, true);
fileHandle = CreateFileW(win32Path.c_str(), GENERIC_WRITE | GENERIC_READ, NtLockAdvisoryToShare(lock), NULL, OPEN_ALWAYS, flags, NULL);
if (fileHandle == INVALID_HANDLE_VALUE)
{
fileHandle = CreateFileW(win32Path.c_str(), GENERIC_WRITE | GENERIC_READ, NtLockAdvisoryToShare(lock), NULL, CREATE_ALWAYS, flags, NULL);
}
break;
}
case EFileOpenMode::eWrite:
{
CreateDirectories(pathex, true);
fileHandle = CreateFileW(win32Path.c_str(), GENERIC_WRITE, NtLockAdvisoryToShare(lock), NULL, CREATE_ALWAYS, flags, NULL);
break;
}
}
if (fileHandle == INVALID_HANDLE_VALUE)
{
SysPushErrorIO("Missing file: {}", path);
return {};
}
this->directIO = directIO;
this->handle = fileHandle;
this->writeHandle = fileHandle;
this->readOnly = openMode == EFileOpenMode::eRead;
return true;
}
void FileHandle::Init(HANDLE read, HANDLE write)
{
this->directIO = true;
this->handle = read;
this->writeHandle = write;
this->readOnly = false;
}
AuSPtr<FileHandle> NtAsyncFileStream::GetHandle()
{
return handle_;
}
void NtAsyncFileStream::Init(const AuSPtr<FileHandle> &handle)
{
this->handle_ = handle;
}
AuSPtr<IAsyncTransaction> NtAsyncFileStream::NewTransaction()
{
auto shared = AuMakeShared<NtAsyncFileTransaction>();
if (!shared)
{
return {};
}
if (!shared->Init(this->handle_))
{
return {};
}
return shared;
}
bool NtAsyncFileStream::BlockingTruncate(AuUInt64 length)
{
LARGE_INTEGER i {};
i.QuadPart = length;
if (!SetFilePointerEx(this->handle_->handle, i, nullptr, FILE_BEGIN))
{
SysPushErrorIO();
return false;
}
return SetEndOfFile(this->handle_->handle);
}
bool NtAsyncFileStream::BlockingRead(AuUInt64 offset, const Memory::MemoryViewStreamWrite &parameters)
{
LARGE_INTEGER i {};
i.QuadPart = offset;
if (!SetFilePointerEx(this->handle_->handle, i, nullptr, FILE_BEGIN))
{
SysPushErrorIO();
return false;
}
DWORD read;
if (!::ReadFile(this->handle_->handle, parameters.ptr, parameters.length, &read, nullptr))
{
SysPushErrorIO();
return false;
}
parameters.outVariable = read;
return true;
}
bool NtAsyncFileStream::BlockingWrite(AuUInt64 offset, const Memory::MemoryViewStreamRead &parameters)
{
LARGE_INTEGER i {};
i.QuadPart = offset;
if (!SetFilePointerEx(this->handle_->handle, i, nullptr, FILE_BEGIN))
{
SysPushErrorIO();
return false;
}
DWORD read;
if (!::WriteFile(this->handle_->handle, parameters.ptr, parameters.length, &read, nullptr))
{
SysPushErrorIO();
return false;
}
parameters.outVariable = read;
return true;
}
bool NtAsyncFileTransaction::Init(const AuSPtr<FileHandle> &handle)
{
this->handle_ = handle;
this->overlap_.hEvent = this->event_ = CreateEventW(nullptr, true, false, nullptr);
return this->overlap_.hEvent != INVALID_HANDLE_VALUE;
}
static bool TranslateNtStatus(NtAsyncFileTransaction *that, BOOL val)
{
if ((val) ||
(!val && (GetLastError() == ERROR_IO_PENDING)))
{
return true;
}
else
{
that->pin_.reset();
that->Reset();
that->osErrorCode = GetLastError();
that->hasFailed = true;
SysPushErrorFIO("QoA async FIO error: {} {}", that->GetFileHandle()->path, that->osErrorCode);
return false;
}
}
static VOID WINAPI GenericCompletionRoutine(
_In_ DWORD dwErrorCode,
_In_ DWORD dwNumberOfBytesTransfered,
_Inout_ LPOVERLAPPED lpOverlapped
)
{
auto transaction = reinterpret_cast<NtAsyncFileTransaction *>(reinterpret_cast<AuUInt8*>(lpOverlapped) - offsetof(NtAsyncFileTransaction, overlap_));
auto hold = AuExchange(transaction->pin_, {});
if (dwErrorCode)
{
hold->hasFailed = true;
hold->osErrorCode = dwErrorCode;
}
else if (!hold->lastAbstractStat_)
{
return;
}
SetEvent(lpOverlapped->hEvent);
hold->lastAbstractStat_ = 0;
hold->CompleteEx(dwNumberOfBytesTransfered);
}
bool NtAsyncFileTransaction::IDontWannaUsePorts()
{
// TODO: maybe we could use a semaphore counter of read attempts
// to allow APC callbacks to drag behind
if (AuExchange(this->pin_, AuSharedFromThis()))
{
while (SleepEx(100, true) == WAIT_IO_COMPLETION)
{
}
if (AuExchange(this->pin_, AuSharedFromThis()))
{
SysPushErrorUnavailableError();
return {};
}
}
return true;
}
bool NtAsyncFileTransaction::StartRead(AuUInt64 offset, const AuSPtr<AuMemoryViewWrite> &memoryView)
{
if (this->isIrredeemable_)
{
SysPushErrorIO("Transaction was signaled to be destroyed to reset mid synchronizable operation. You can no longer use this stream object");
return false;
}
if (!IDontWannaUsePorts())
{
return false;
}
if (!memoryView)
{
SysPushErrorArg();
return {};
}
this->latch_ = false;
::ResetEvent(this->event_);
this->memoryHold_ = memoryView;
this->hasFailed = false;
this->lastAbstractStat_ = memoryView->length;
this->lastAbstractOffset_ = offset;
this->lastBytes = 0;
this->overlap_.Offset = offset & 0xFFFFFFFF;
this->overlap_.OffsetHigh = (offset >> 32) & 0xFFFFFFFF;
auto ret = ::ReadFileEx(this->handle_->handle, memoryView->ptr, memoryView->length, &overlap_, GenericCompletionRoutine);
return TranslateNtStatus(this, ret);
}
bool NtAsyncFileTransaction::StartWrite(AuUInt64 offset, const AuSPtr<AuMemoryViewRead> &memoryView)
{
if (this->isIrredeemable_)
{
SysPushErrorIO("Transaction was signaled to be destroyed to reset mid synchronizable operation. You can no longer use this stream object");
return false;
}
if (!IDontWannaUsePorts())
{
return false;
}
if (!memoryView)
{
SysPushErrorArg();
return {};
}
this->latch_ = false;
::ResetEvent(this->event_);
this->hasFailed = false;
this->memoryHold_ = memoryView;
this->lastBytes = 0;
this->lastAbstractStat_ = memoryView->length;
this->lastAbstractOffset_ = offset;
this->overlap_.Offset = offset & 0xFFFFFFFF;
this->overlap_.OffsetHigh = (offset >> 32) & 0xFFFFFFFF;
auto ret = ::WriteFileEx(this->handle_->writeHandle, memoryView->ptr, memoryView->length, &overlap_, GenericCompletionRoutine);
return TranslateNtStatus(this, ret);
}
void NtAsyncFileTransaction::DispatchCb(AuUInt32 read)
{
this->lastBytes = read;
if (AuExchange(this->latch_, true))
{
return;
}
this->lastBytes = read;
auto memoryHold = this->memoryHold_;
if (this->sub_)
{
this->sub_->OnAsyncFileOpFinished(this->lastAbstractOffset_, read);
}
this->memoryHold_.reset();
}
void NtAsyncFileTransaction::Reset()
{
if (this->lastAbstractStat_)
{
this->isIrredeemable_ = true;
this->hasFailed = true;
::CancelIoEx(this->handle_->handle, &this->overlap_);
::SetEvent(this->event_);
this->osErrorCode = ERROR_ABANDONED_WAIT_0;
}
else
{
::ResetEvent(this->event_);
this->hasFailed = false;
}
this->lastBytes = 0;
this->lastAbstractStat_ = 0;
}
bool NtAsyncFileTransaction::Failed()
{
return this->hasFailed && this->osErrorCode != ERROR_BROKEN_PIPE;
}
AuUInt NtAsyncFileTransaction::GetOSErrorCode()
{
return this->hasFailed ? this->osErrorCode : ERROR_SUCCESS;
}
bool NtAsyncFileTransaction::CompleteEx(AuUInt completeRoutine)
{
DWORD read {};
if (this->isIrredeemable_)
{
::ResetEvent(this->event_);
return true;
}
if (!completeRoutine)
{
if (this->GetOSErrorCode() == ERROR_BROKEN_PIPE)
{
auto pipe = this->ntIpcPipeImpl.lock();
DispatchCb(0);
if (pipe)
{
pipe->OnEndOfReadStream();
}
return true;
}
if ((this->hasFailed) ||
::GetOverlappedResult(this->handle_->handle, &this->overlap_, &read, false))
{
this->lastAbstractStat_ = 0;
bool bLatched = this->latch_;
DispatchCb(read);
return this->hasFailed || read;
}
}
else
{
if (GetLastError() == ERROR_BROKEN_PIPE)
{
auto pipe = this->ntIpcPipeImpl.lock();
DispatchCb(0);
if (pipe)
{
pipe->OnEndOfReadStream();
}
}
else
{
DispatchCb(completeRoutine);
}
return true;
}
return false;
}
bool NtAsyncFileTransaction::Complete()
{
return CompleteEx(false);
}
AuUInt32 NtAsyncFileTransaction::GetLastPacketLength()
{
return this->lastBytes;
}
void NtAsyncFileTransaction::SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &sub)
{
this->sub_ = sub;
}
bool NtAsyncFileTransaction::Wait(AuUInt32 timeout)
{
if (this->latch_)
{
return true;
}
DWORD ret;
do
{
ret = ::WaitForSingleObjectEx(this->event_, timeout ? timeout : INFINITE, true);
}
while (ret == WAIT_IO_COMPLETION);
if (ret == WAIT_OBJECT_0)
{
return Complete();
}
return false;
}
HANDLE NtAsyncFileTransaction::GetHandle()
{
return this->event_;
}
AuSPtr<FileHandle> NtAsyncFileTransaction::GetFileHandle()
{
return this->handle_;
}
AuSPtr<AuLoop::ILoopSource> NtAsyncFileTransaction::NewLoopSource()
{
return AuMakeShared<NtAsyncFileTransactionLoopSource>(AuSharedFromThis());
}
AUKN_SYM IAsyncFileStream *OpenAsyncNew(const AuString &path, EFileOpenMode openMode, bool directIO, EFileAdvisoryLockLevel lock)
{
AuSPtr<FileHandle> fileHandle;
NtAsyncFileStream *stream;
if (path.empty())
{
SysPushErrorParam("Empty path");
return {};
}
if (!EFileOpenModeIsValid(openMode))
{
SysPushErrorParam("Invalid open mode");
return {};
}
fileHandle = AuMakeShared<FileHandle>();
if (!fileHandle->Init(path, openMode, directIO, lock))
{
return {};
}
stream = _new NtAsyncFileStream();
if (!stream)
{
return {};
}
stream->Init(fileHandle);
return stream;
}
AUKN_SYM void OpenAsyncRelease(IAsyncFileStream *handle)
{
AuSafeDelete<NtAsyncFileStream *>(handle);
}
}