/*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: Async.NT.cpp Date: 2021-9-13 Author: Reece ***/ #include #include "FS.hpp" #include //#include "IPCHandle.hpp" #include "Async.NT.hpp" #include "FileAdvisory.NT.hpp" #include #include namespace Aurora::IO::FS { struct NtAsyncFileTransactionLoopSource : AuLoop::LSHandle { NtAsyncFileTransactionLoopSource(AuSPtr that) : caller_(that), Loop::LSHandle(AuUInt(that->event)) {} virtual bool IsSignaled() override; virtual bool OnTrigger(AuUInt handle) override; virtual AuLoop::ELoopSource GetType() override; private: AuWPtr caller_; }; bool NtAsyncFileTransactionLoopSource::OnTrigger(AuUInt handle) { auto lock = caller_.lock(); if (lock) { return lock->Complete(); } return true; } Loop::ELoopSource NtAsyncFileTransactionLoopSource::GetType() { return Loop::ELoopSource::eSourceAIO; } bool NtAsyncFileTransactionLoopSource::IsSignaled() { auto lock = caller_.lock(); if (lock) { return lock->Complete(); } return LSHandle::IsSignaled(); } NtAsyncFileTransaction::~NtAsyncFileTransaction() { AuWin32CloseHandle(this->event); } AuSPtr NtAsyncFileStream::GetHandle() { return this->pHandle_; } void NtAsyncFileStream::Init(const AuSPtr &pHandle) { this->pHandle_ = pHandle; } AuSPtr NtAsyncFileStream::NewTransaction() { auto shared = AuMakeShared(); if (!shared) { return {}; } if (!shared->InitWeak(this->pHandle_)) { return {}; } return shared; } bool NtAsyncFileStream::BlockingTruncate(AuUInt64 length) { LARGE_INTEGER i {}; i.QuadPart = length; auto hHandle = (HANDLE)this->pHandle_->GetOSHandle(); if (!SetFilePointerEx(hHandle, i, nullptr, FILE_BEGIN)) { SysPushErrorIO(); return false; } return SetEndOfFile(hHandle); } bool NtAsyncFileStream::BlockingRead(AuUInt64 offset, const Memory::MemoryViewStreamWrite ¶meters) { LARGE_INTEGER i {}; i.QuadPart = offset; auto hOptSafe = this->pHandle_->GetOSReadHandleSafe(); if (!hOptSafe) { return false; } auto hHandle = (HANDLE)hOptSafe.Value(); if (!SetFilePointerEx(hHandle, i, nullptr, FILE_BEGIN)) { SysPushErrorIO(); return false; } OVERLAPPED a {}; a.hEvent = CreateEventA(NULL, true, 0, NULL); DWORD read; if (!::ReadFile(hHandle, parameters.ptr, parameters.length, NULL, &a) && ::GetLastError() != ERROR_IO_PENDING) { SysPushErrorIO(); ::CloseHandle(a.hEvent); return false; } ::WaitForSingleObject(a.hEvent, 0); if (!::GetOverlappedResult(hHandle, &a, &read, true)) { ::CloseHandle(a.hEvent); return false; } ::CloseHandle(a.hEvent); parameters.outVariable = read; return true; } bool NtAsyncFileStream::BlockingWrite(AuUInt64 offset, const Memory::MemoryViewStreamRead ¶meters) { LARGE_INTEGER i {}; i.QuadPart = offset; auto hOptSafe = this->pHandle_->GetOSWriteHandleSafe(); if (!hOptSafe) { return false; } auto hHandle = (HANDLE)hOptSafe.Value(); if (!SetFilePointerEx(hHandle, i, nullptr, FILE_BEGIN)) { SysPushErrorIO(); return false; } OVERLAPPED a {}; a.hEvent = CreateEventA(NULL, true, 0, NULL); DWORD read; if (!::WriteFile(hHandle, parameters.ptr, parameters.length, NULL, &a) && ::GetLastError() != ERROR_IO_PENDING) { SysPushErrorIO(); ::CloseHandle(a.hEvent); return false; } ::WaitForSingleObject(a.hEvent, 0); if (!::GetOverlappedResult(hHandle, &a, &read, true)) { ::CloseHandle(a.hEvent); return false; } ::CloseHandle(a.hEvent); parameters.outVariable = read; return true; } bool NtAsyncFileTransaction::InitWeak(const AuSPtr &handle) { this->wpHandle_ = handle; this->overlap.hEvent = this->event = CreateEventW(nullptr, true, false, nullptr); return this->overlap.hEvent != INVALID_HANDLE_VALUE; } bool NtAsyncFileTransaction::Init(const AuSPtr &handle) { this->pHandle_ = handle; this->overlap.hEvent = this->event = CreateEventW(nullptr, true, false, nullptr); return this->overlap.hEvent != INVALID_HANDLE_VALUE; } void NtAsyncFileTransaction::ResetAIO() { auto hEvent = this->overlap.hEvent; AuResetMember(this->overlap); this->overlap.hEvent = hEvent; } static bool TranslateNtStatus(NtAsyncFileTransaction *that, BOOL val) { auto er = GetLastError(); if (val) { //(void)that->Complete(); return true; } else if (er == ERROR_IO_PENDING) { return true; } else if (er == ERROR_BROKEN_PIPE || er == ERROR_HANDLE_EOF) { SetEvent(that->event); // also required: that->bHasFailed = true; // to pass completion that->dwOsErrorCode = er; // to suppress actual error condition auto pipe = AuTryLockMemoryType(that->pNtIpcPipeImpl); that->DispatchCb(0); if (pipe) { pipe->OnEndOfReadStream(); } that->pMemoryHold.reset(); that->pPin.reset(); return true; } else { that->pPin.reset(); that->Reset(); that->dwOsErrorCode = er; that->bHasFailed = true; SysPushErrorFIO("QoA async FIO error: {} {}", /*that->GetFileHandle()->path*/ "", that->dwOsErrorCode); return false; } } static void WINAPI FileOperationCompletion( DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped) { auto transaction = reinterpret_cast(reinterpret_cast(lpOverlapped) - offsetof(NtAsyncFileTransaction, overlap)); auto hold = AuExchange(transaction->pPin, {}); if (!hold) { return; } if (dwErrorCode) { hold->bHasFailed = true; hold->dwOsErrorCode = dwErrorCode; } else if (!hold->dwLastAbstractStat) { return; } SetEvent(lpOverlapped->hEvent); auto pStupid = AuExchange(hold->pMemoryHold, {}); hold->CompleteEx(dwNumberOfBytesTransfered, true); } bool NtAsyncFileTransaction::IDontWannaUsePorts() { if (AuExchange(this->pPin, AuSharedFromThis())) { while (SleepEx(0, true) == WAIT_IO_COMPLETION) { } if (AuExchange(this->pPin, AuSharedFromThis())) { SysPushErrorUnavailableError(); return {}; } } return true; } bool NtAsyncFileTransaction::StartRead(AuUInt64 offset, const AuSPtr &memoryView) { if (this->isIrredeemable_) { SysPushErrorIO("Transaction was signaled to be destroyed to reset mid synchronizable operation. You can no longer use this stream object"); return false; } if (!IDontWannaUsePorts()) { return false; } if (!memoryView) { SysPushErrorArg(); return {}; } if (this->pMemoryHold) { SysPushErrorIO("IO Operation in progress"); return {}; } auto pHandle = this->GetFileHandle(); auto optRead = pHandle->GetOSReadHandleSafe(); if (!optRead) { return false; } this->bLatch = false; ::ResetEvent(this->event); this->pMemoryHold = memoryView; this->bHasFailed = false; this->dwLastAbstractStat = memoryView->length; this->qwLastAbstractOffset = offset; this->dwLastBytes = 0; this->ResetAIO(); offset += this->uBaseOffset; this->overlap.Offset = AuBitsToLower(offset); this->overlap.OffsetHigh = AuBitsToHigher(offset); auto ret = ::ReadFileEx((HANDLE)optRead.value(), memoryView->ptr, memoryView->length, &this->overlap, FileOperationCompletion); return TranslateNtStatus(this, ret); } bool NtAsyncFileTransaction::StartWrite(AuUInt64 offset, const AuSPtr &memoryView) { if (this->isIrredeemable_) { SysPushErrorIO("Transaction was signaled to be destroyed to reset mid synchronizable operation. You can no longer use this stream object"); return false; } if (!IDontWannaUsePorts()) { return false; } if (!memoryView) { SysPushErrorArg(); return {}; } if (this->pMemoryHold) { SysPushErrorIO("IO Operation in progress"); return {}; } auto pHandle = this->GetFileHandle(); auto optWrite = pHandle->GetOSWriteHandleSafe(); if (!optWrite) { return false; } this->bLatch = false; ::ResetEvent(this->event); this->bHasFailed = false; this->pMemoryHold = memoryView; this->dwLastBytes = 0; this->dwLastAbstractStat = memoryView->length; this->qwLastAbstractOffset = offset; this->ResetAIO(); offset += this->uBaseOffset; this->overlap.Offset = AuBitsToLower(offset); this->overlap.OffsetHigh = AuBitsToHigher(offset); auto ret = ::WriteFileEx((HANDLE)optWrite.value(), memoryView->ptr, memoryView->length, &this->overlap, FileOperationCompletion); return TranslateNtStatus(this, ret); } void NtAsyncFileTransaction::DispatchCb(AuUInt32 read) { this->dwLastAbstractStat = 0; this->dwLastBytes = read; if (AuExchange(this->bLatch, true)) { return; } this->dwLastBytes = read; if (this->pSub_) { this->pSub_->OnAsyncFileOpFinished(this->qwLastAbstractOffset, read); } } void NtAsyncFileTransaction::Reset() { if (this->dwLastAbstractStat) { this->isIrredeemable_ = true; this->bHasFailed = true; auto hOptSafe = this->GetFileHandle()->GetOSReadHandleSafe(); if (hOptSafe) { auto hHandle = (HANDLE)hOptSafe.Value(); if (pCancelIoEx) { pCancelIoEx(hHandle, &this->overlap); } else { ::CancelIo(hHandle); } } ::SetEvent(this->event); this->dwOsErrorCode = ERROR_ABANDONED_WAIT_0; } else { ::ResetEvent(this->event); this->bHasFailed = false; } this->dwLastBytes = 0; this->dwLastAbstractStat = 0; } bool NtAsyncFileTransaction::Failed() { return this->bHasFailed && this->dwOsErrorCode != ERROR_BROKEN_PIPE && this->dwOsErrorCode != ERROR_HANDLE_EOF; } AuUInt NtAsyncFileTransaction::GetOSErrorCode() { return this->bHasFailed ? this->dwOsErrorCode : ERROR_SUCCESS; } bool NtAsyncFileTransaction::CompleteEx(AuUInt completeRoutine, bool bForce) { DWORD read {}; this->dwLastAbstractStat = 0; if (this->isIrredeemable_) { ::ResetEvent(this->event); return true; } if (!completeRoutine) { auto hOptSafe = this->GetFileHandle()->GetOSReadHandleSafe(); if (hOptSafe) { auto hHandle = (HANDLE)hOptSafe.Value(); if (::GetOverlappedResult(hHandle, &this->overlap, &read, false) && (read || bForce)) { SetEvent(this->overlap.hEvent); DispatchCb(read); return true; } } } else { if (this->dwOsErrorCode == ERROR_BROKEN_PIPE || this->dwOsErrorCode == ERROR_HANDLE_EOF) { auto pPipe = AuTryLockMemoryType(this->pNtIpcPipeImpl); DispatchCb(0); if (pPipe) { pPipe->OnEndOfReadStream(); } } else { DispatchCb(completeRoutine); } return true; } return bool(this->dwLastBytes) || this->bHasFailed; } bool NtAsyncFileTransaction::Complete() { return CompleteEx(false); } bool NtAsyncFileTransaction::HasCompleted() { return bool(this->dwLastBytes) || this->bHasFailed; } AuUInt32 NtAsyncFileTransaction::GetLastPacketLength() { return this->dwLastBytes; } void NtAsyncFileTransaction::SetCallback(const AuSPtr &sub) { this->pSub_ = sub; } void NtAsyncFileTransaction::SetBaseOffset(AuUInt64 uBaseOffset) { this->uBaseOffset = uBaseOffset; } bool NtAsyncFileTransaction::Wait(AuUInt32 timeout) { if (this->bLatch) { return true; } DWORD ret; do { ret = ::WaitForSingleObjectEx(this->event, timeout ? timeout : INFINITE, true); } while (ret == WAIT_IO_COMPLETION); if (ret == WAIT_OBJECT_0) { return Complete(); } return false; } HANDLE NtAsyncFileTransaction::GetHandle() { return this->event; } AuSPtr NtAsyncFileTransaction::GetFileHandle() { return this->pHandle_ ? this->pHandle_ : AuTryLockMemoryType(this->wpHandle_); } AuSPtr NtAsyncFileTransaction::NewLoopSource() { return AuMakeShared(AuSharedFromThis()); } AUKN_SYM IAsyncFileStream *OpenAsyncNew(const AuString &path, EFileOpenMode openMode, bool directIO, EFileAdvisoryLockLevel lock) { auto pHandle = AuIO::IOHandleShared(); if (!pHandle) { SysPushErrorMemory(); return nullptr; } AuIO::IIOHandle::HandleCreate createhandle(path); createhandle.eAdvisoryLevel = lock; createhandle.eMode = openMode; createhandle.bFailIfNonEmptyFile = false; createhandle.bDirectIOMode = directIO; createhandle.bAsyncHandle = true; if (!pHandle->InitFromPath(createhandle)) { SysPushErrorNested(); return nullptr; } auto pStream = _new NtAsyncFileStream(); if (!pStream) { SysPushErrorMemory(); return nullptr; } pStream->Init(pHandle); return pStream; } AUKN_SYM void OpenAsyncRelease(IAsyncFileStream *handle) { AuSafeDelete(handle); } }