AuroraRuntime/Source/IO/FS/Async.NT.cpp

754 lines
19 KiB
C++

/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Async.NT.cpp
Date: 2021-9-13
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "FS.hpp"
#include <Source/IO/Loop/LSHandle.hpp>
//#include "IPCHandle.hpp"
#include "Async.NT.hpp"
#include "FileAdvisory.NT.hpp"
#include <Source/IO/Loop/Loop.hpp>
#include <Source/IO/Loop/LSEvent.hpp>
#include <Source/IO/IPC/AuIPCPipe.NT.hpp>
namespace Aurora::IO::FS
{
struct NtAsyncFileTransactionLoopSource : AuLoop::LSHandle
{
NtAsyncFileTransactionLoopSource(AuSPtr<NtAsyncFileTransaction> that) : caller_(that), Loop::LSHandle(AuUInt(that->event))
{}
virtual bool IsSignaled() override;
virtual bool OnTrigger(AuUInt handle) override;
virtual AuLoop::ELoopSource GetType() override;
private:
AuWPtr<NtAsyncFileTransaction> caller_;
};
bool NtAsyncFileTransactionLoopSource::OnTrigger(AuUInt handle)
{
auto lock = caller_.lock();
if (lock)
{
return lock->Complete();
}
return true;
}
Loop::ELoopSource NtAsyncFileTransactionLoopSource::GetType()
{
return Loop::ELoopSource::eSourceAIO;
}
bool NtAsyncFileTransactionLoopSource::IsSignaled()
{
auto lock = caller_.lock();
if (lock)
{
return lock->Complete();
}
return LSHandle::IsSignaled();
}
NtAsyncFileTransaction::~NtAsyncFileTransaction()
{
if (this->bOwnsEvent_)
{
AuWin32CloseHandle(this->event);
}
}
AuSPtr<IIOHandle> NtAsyncFileStream::GetHandle()
{
return this->pHandle_;
}
void NtAsyncFileStream::Init(const AuSPtr<IIOHandle> &pHandle)
{
this->pHandle_ = pHandle;
}
AuSPtr<IAsyncTransaction> NtAsyncFileStream::NewTransaction()
{
if (!this->pHandle_->IsAsync())
{
auto &pFSReader = this->pCache_;
if (!pFSReader)
{
pFSReader = OpenBlockingFileStreamFromHandleShared(this->pHandle_);
}
if (!pFSReader)
{
SysPushErrorNested();
return {};
}
auto pStreamReader = AuSharedPointerFromShared(pFSReader->ToStreamSeekingReader(), pFSReader);
auto pStreamWriter = AuSharedPointerFromShared(pFSReader->ToStreamSeekingWriter(), pFSReader);
return Adapters::NewAsyncTransactionFromStreamSeekingPair(pStreamReader, pStreamWriter, {});
}
auto shared = AuMakeShared<NtAsyncFileTransaction>();
SysCheckNotNullMemory(shared, {});
//if (!shared->InitWeak(this->pHandle_))
if (!shared->Init(this->pHandle_))
{
return {};
}
return shared;
}
bool NtAsyncFileStream::BlockingTruncate(AuUInt64 length)
{
LARGE_INTEGER i {};
i.QuadPart = length;
auto hHandle = (HANDLE)this->pHandle_->GetOSHandle();
if (!SetFilePointerEx(hHandle, i, nullptr, FILE_BEGIN))
{
SysPushErrorIO();
return false;
}
return SetEndOfFile(hHandle);
}
bool NtAsyncFileStream::BlockingRead(AuUInt64 offset, const Memory::MemoryViewStreamWrite &parameters)
{
LARGE_INTEGER i {};
i.QuadPart = offset;
parameters.outVariable = 0;
auto hOptSafe = this->pHandle_->GetOSReadHandleSafe();
if (!hOptSafe)
{
return false;
}
auto hHandle = (HANDLE)hOptSafe.Value();
if (!SetFilePointerEx(hHandle, i, nullptr, FILE_BEGIN))
{
SysPushErrorIO();
return false;
}
OVERLAPPED a {};
a.hEvent = CreateEventA(NULL, true, 0, NULL);
DWORD read;
if (!::ReadFile(hHandle, parameters.ptr, parameters.length, NULL, &a) &&
::GetLastError() != ERROR_IO_PENDING)
{
SysPushErrorIO();
::CloseHandle(a.hEvent);
return false;
}
::WaitForSingleObject(a.hEvent, 0);
if (!::GetOverlappedResult(hHandle, &a, &read, true))
{
::CloseHandle(a.hEvent);
return false;
}
::CloseHandle(a.hEvent);
parameters.outVariable = read;
return true;
}
bool NtAsyncFileStream::BlockingWrite(AuUInt64 offset, const Memory::MemoryViewStreamRead &parameters)
{
LARGE_INTEGER i {};
i.QuadPart = offset;
parameters.outVariable = 0;
auto hOptSafe = this->pHandle_->GetOSWriteHandleSafe();
if (!hOptSafe)
{
return false;
}
auto hHandle = (HANDLE)hOptSafe.Value();
if (!SetFilePointerEx(hHandle, i, nullptr, FILE_BEGIN))
{
SysPushErrorIO();
return false;
}
OVERLAPPED a {};
a.hEvent = CreateEventA(NULL, true, 0, NULL);
DWORD read;
if (!::WriteFile(hHandle, parameters.ptr, parameters.length, NULL, &a) &&
::GetLastError() != ERROR_IO_PENDING)
{
SysPushErrorIO();
::CloseHandle(a.hEvent);
return false;
}
::WaitForSingleObject(a.hEvent, 0);
if (!::GetOverlappedResult(hHandle, &a, &read, true))
{
::CloseHandle(a.hEvent);
return false;
}
::CloseHandle(a.hEvent);
parameters.outVariable = read;
return true;
}
bool NtAsyncFileTransaction::InitWeak(const AuSPtr<IIOHandle> &handle)
{
this->wpHandle_ = handle;
this->overlap.hEvent = this->event = CreateEventW(nullptr, true, false, nullptr);
return this->overlap.hEvent != INVALID_HANDLE_VALUE;
}
bool NtAsyncFileTransaction::Init(const AuSPtr<IIOHandle> &handle)
{
this->pHandle_ = handle;
this->overlap.hEvent = this->event = CreateEventW(nullptr, true, false, nullptr);
return this->overlap.hEvent != INVALID_HANDLE_VALUE;
}
void NtAsyncFileTransaction::ResetAIO()
{
auto hEvent = this->overlap.hEvent;
AuResetMember(this->overlap);
this->overlap.hEvent = hEvent;
}
static bool TranslateNtStatus(NtAsyncFileTransaction *that, BOOL val)
{
auto er = GetLastError();
if (val)
{
//(void)that->Complete();
return true;
}
else if (er == ERROR_IO_PENDING)
{
return true;
}
else if (er == ERROR_BROKEN_PIPE ||
er == ERROR_HANDLE_EOF)
{
SetEvent(that->event);
// also required:
that->bHasFailed = true; // to pass completion
that->dwOsErrorCode = er; // to suppress actual error condition
auto pipe = AuTryLockMemoryType(that->pNtIpcPipeImpl);
that->DispatchCb(0);
if (pipe)
{
pipe->OnEndOfReadStream();
}
AuResetMember(that->readView);
AuResetMember(that->writeView);
that->pPin.reset();
return true;
}
else
{
that->pPin.reset();
that->Reset();
that->dwOsErrorCode = er;
that->bHasFailed = true;
SysPushErrorFIO("QoA async FIO error: {} {}", /*that->GetFileHandle()->path*/ "", that->dwOsErrorCode);
return false;
}
}
static void WINAPI FileOperationCompletion(
DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped)
{
auto transaction = reinterpret_cast<NtAsyncFileTransaction *>(reinterpret_cast<AuUInt8*>(lpOverlapped) - offsetof(NtAsyncFileTransaction, overlap));
auto hold = AuExchange(transaction->pPin, {});
if (!hold)
{
return;
}
if (dwErrorCode)
{
hold->bHasFailed = true;
hold->dwOsErrorCode = dwErrorCode;
}
else if (!hold->dwLastAbstractStat)
{
return;
}
SetEvent(lpOverlapped->hEvent);
auto pStupid1 = AuExchange(hold->readView, {});
auto pStupid2 = AuExchange(hold->writeView, {});
hold->CompleteEx(dwNumberOfBytesTransfered, true);
}
bool NtAsyncFileTransaction::IDontWannaUsePorts()
{
if (AuExchange(this->pPin, AuSharedFromThis()))
{
while (SleepEx(0, true) == WAIT_IO_COMPLETION)
{
}
if (AuExchange(this->pPin, AuSharedFromThis()))
{
SysPushErrorUnavailableError();
return {};
}
}
return true;
}
bool NtAsyncFileTransaction::StartRead(AuUInt64 offset, const AuMemoryViewWrite &memoryView)
{
if (this->isIrredeemable_)
{
SysPushErrorIO("Transaction was signaled to be destroyed to reset mid synchronizable operation. You can no longer use this stream object");
return false;
}
if (!IDontWannaUsePorts())
{
return false;
}
if (!memoryView)
{
SysPushErrorArg();
return {};
}
if (this->readView ||
this->writeView)
{
SysPushErrorIO("IO Operation in progress");
return {};
}
auto pHandle = this->GetFileHandle();
auto optRead = pHandle->GetOSReadHandleSafe();
if (!optRead)
{
return false;
}
this->bLatch = false;
if (this->bOwnsEvent_)
{
::ResetEvent(this->event);
}
this->writeView = memoryView;
this->bHasFailed = false;
this->dwLastAbstractStat = memoryView.length;
this->qwLastAbstractOffset = offset;
this->dwLastBytes = 0;
this->ResetAIO();
offset += this->uBaseOffset;
this->overlap.Offset = AuBitsToLower(offset);
this->overlap.OffsetHigh = AuBitsToHigher(offset);
auto ret = ::ReadFileEx((HANDLE)optRead.value(), memoryView.ptr, memoryView.length, &this->overlap, FileOperationCompletion);
return TranslateNtStatus(this, ret);
}
bool NtAsyncFileTransaction::StartWrite(AuUInt64 offset, const AuMemoryViewRead &memoryView)
{
if (this->isIrredeemable_)
{
SysPushErrorIO("Transaction was signaled to be destroyed to reset mid synchronizable operation. You can no longer use this stream object");
return false;
}
if (!IDontWannaUsePorts())
{
return false;
}
if (!memoryView)
{
SysPushErrorArg();
return {};
}
if (this->readView ||
this->writeView)
{
SysPushErrorIO("IO Operation in progress");
return {};
}
auto pHandle = this->GetFileHandle();
auto optWrite = pHandle->GetOSWriteHandleSafe();
if (!optWrite)
{
return false;
}
this->bLatch = false;
if (this->bOwnsEvent_)
{
::ResetEvent(this->event);
}
this->bHasFailed = false;
this->readView = memoryView;
this->dwLastBytes = 0;
this->dwLastAbstractStat = memoryView.length;
this->qwLastAbstractOffset = offset;
this->ResetAIO();
offset += this->uBaseOffset;
this->overlap.Offset = AuBitsToLower(offset);
this->overlap.OffsetHigh = AuBitsToHigher(offset);
auto ret = ::WriteFileEx((HANDLE)optWrite.value(), memoryView.ptr, memoryView.length, &this->overlap, FileOperationCompletion);
return TranslateNtStatus(this, ret);
}
void NtAsyncFileTransaction::DispatchCb(AuUInt32 read)
{
this->dwLastAbstractStat = 0;
this->dwLastBytes = read;
if (AuExchange(this->bLatch, true))
{
return;
}
this->dwLastBytes = read;
if (this->pSub_)
{
this->pSub_->OnAsyncFileOpFinished(this->qwLastAbstractOffset, read);
}
}
void NtAsyncFileTransaction::Reset()
{
if (this->dwLastAbstractStat)
{
this->isIrredeemable_ = true;
this->bHasFailed = true;
auto hOptSafe = this->GetFileHandle()->GetOSReadHandleSafe();
if (hOptSafe)
{
auto hHandle = (HANDLE)hOptSafe.Value();
if (pCancelIoEx)
{
pCancelIoEx(hHandle, &this->overlap);
}
else
{
::CancelIo(hHandle);
}
}
::SetEvent(this->event);
this->dwOsErrorCode = ERROR_ABANDONED_WAIT_0;
}
else
{
if (this->bOwnsEvent_)
{
::ResetEvent(this->event);
}
this->bHasFailed = false;
}
this->dwLastBytes = 0;
this->dwLastAbstractStat = 0;
AuResetMember(this->readView);
AuResetMember(this->writeView);
}
bool NtAsyncFileTransaction::TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup)
{
if (!this->bOwnsEvent_ ||
!pCompletionGroup)
{
return false;
}
auto pLoopSource = pCompletionGroup->GetTriggerLoopSource();
if (!pLoopSource)
{
return false;
}
this->bOwnsEvent_ = false;
AuWin32CloseHandle(this->event);
this->event = this->overlap.hEvent = (HANDLE)AuStaticCast<Loop::LSEvent>(pLoopSource)->GetHandle();
pCompletionGroup->AddWorkItem(this->SharedFromThis());
this->pCompletionGroup_ = pCompletionGroup;
return true;
}
CompletionGroup::ICompletionGroupWorkHandle *NtAsyncFileTransaction::ToCompletionGroupHandle()
{
return this;
}
AuSPtr<CompletionGroup::ICompletionGroup> NtAsyncFileTransaction::GetCompletionGroup()
{
return this->pCompletionGroup_;
}
bool NtAsyncFileTransaction::HasFailed()
{
return this->bHasFailed &&
this->dwOsErrorCode != ERROR_BROKEN_PIPE &&
this->dwOsErrorCode != ERROR_HANDLE_EOF;
}
AuUInt NtAsyncFileTransaction::GetOSErrorCode()
{
return this->bHasFailed ? this->dwOsErrorCode : ERROR_SUCCESS;
}
bool NtAsyncFileTransaction::CompleteEx(AuUInt completeRoutine, bool bForce)
{
DWORD read {};
this->dwLastAbstractStat = 0;
if (this->isIrredeemable_)
{
::ResetEvent(this->event);
return true;
}
if (!completeRoutine)
{
auto hOptSafe = this->GetFileHandle()->GetOSReadHandleSafe();
if (hOptSafe)
{
auto hHandle = (HANDLE)hOptSafe.Value();
if (::GetOverlappedResult(hHandle,
&this->overlap,
&read,
false) && (read || bForce))
{
SetEvent(this->overlap.hEvent);
DispatchCb(read);
return true;
}
}
}
else
{
if (this->dwOsErrorCode == ERROR_BROKEN_PIPE ||
this->dwOsErrorCode == ERROR_HANDLE_EOF)
{
auto pPipe = AuTryLockMemoryType(this->pNtIpcPipeImpl);
DispatchCb(0);
if (pPipe)
{
pPipe->OnEndOfReadStream();
}
}
else
{
DispatchCb(completeRoutine);
}
return true;
}
return bool(this->dwLastBytes) ||
this->bHasFailed;
}
bool NtAsyncFileTransaction::Complete()
{
return CompleteEx(false);
}
bool NtAsyncFileTransaction::HasCompletedForGCWI()
{
return this->HasCompleted();
}
void NtAsyncFileTransaction::CleanupForGCWI()
{
this->overlap.hEvent = this->event = INVALID_HANDLE_VALUE;
AuResetMember(this->pCompletionGroup_);
}
bool NtAsyncFileTransaction::HasCompleted()
{
return bool(this->dwLastBytes) ||
this->bHasFailed;
}
AuUInt32 NtAsyncFileTransaction::GetLastPacketLength()
{
return this->dwLastBytes;
}
void NtAsyncFileTransaction::SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &sub)
{
this->pSub_ = sub;
}
void NtAsyncFileTransaction::SetBaseOffset(AuUInt64 uBaseOffset)
{
this->uBaseOffset = uBaseOffset;
}
bool NtAsyncFileTransaction::Wait(AuUInt32 timeout)
{
if (this->bLatch)
{
return true;
}
DWORD ret;
do
{
ret = ::WaitForSingleObjectEx(this->event, timeout ? timeout : INFINITE, true);
}
while (ret == WAIT_IO_COMPLETION);
if (ret == WAIT_OBJECT_0)
{
return Complete();
}
return false;
}
HANDLE NtAsyncFileTransaction::GetHandle()
{
return this->event;
}
AuSPtr<IIOHandle> NtAsyncFileTransaction::GetFileHandle()
{
return this->pHandle_ ? this->pHandle_ : AuTryLockMemoryType(this->wpHandle_);
}
AuSPtr<AuLoop::ILoopSource> NtAsyncFileTransaction::NewLoopSource()
{
if (this->bOwnsEvent_)
{
SysCheckRetExpNotNullMemory(AuMakeShared<NtAsyncFileTransactionLoopSource>(AuSharedFromThis()), {});
}
else if (this->pCompletionGroup_)
{
return this->pCompletionGroup_->ToAnyLoopSource();
}
else
{
return {};
}
}
AUKN_SYM IAsyncFileStream *OpenAsyncNew(const AuROString &path, EFileOpenMode openMode, AuOptional<bool> optbDirectIO, AuOptional<EFileAdvisoryLockLevel> optLock)
{
auto bDirectIO = optbDirectIO.ValueOr(true);
auto lock = optLock.ValueOr(EFileAdvisoryLockLevel::eNoSafety);
auto pHandle = AuIO::IOHandleShared();
if (!pHandle)
{
SysPushErrorMemory();
return nullptr;
}
AuIO::IIOHandle::HandleCreate createhandle(path);
createhandle.eAdvisoryLevel = lock;
createhandle.eMode = openMode;
createhandle.bFailIfNonEmptyFile = false;
createhandle.bDirectIOMode = bDirectIO;
createhandle.bAsyncHandle = true;
if (!pHandle->InitFromPath(createhandle))
{
SysPushErrorNested();
return nullptr;
}
auto pStream = _new NtAsyncFileStream();
if (!pStream)
{
SysPushErrorMemory();
return nullptr;
}
pStream->Init(pHandle);
return pStream;
}
AUKN_SYM void OpenAsyncRelease(IAsyncFileStream *handle)
{
AuSafeDelete<NtAsyncFileStream *>(handle);
}
AUKN_SYM IAsyncFileStream *OpenAsyncFromSharedHandleNew(const AuSPtr<IIOHandle> &pIOHandle)
{
auto pStream = _new NtAsyncFileStream();
if (!pStream)
{
SysPushErrorMemory();
return nullptr;
}
pStream->Init(pIOHandle);
return pStream;
}
AUKN_SYM void OpenAsyncFromSharedHandleRelease(IAsyncFileStream *handle)
{
AuSafeDelete<NtAsyncFileStream *>(handle);
}
}