AuroraRuntime/Source/IO/FS/Watcher.NT.cpp
Reece 7fde7d04fb [+] EPipeCallbackType
[+] IIOBufferedStreamAvailable callback
[+] IIOProcessor singleshot work items / IIOProcessorWorkUnit
[+] IOPipeCallback description of a pipes destination
[+] IOPipeInputData description of a pipes source
[+] IOPipeRequest, IOPipeRequestAIO, IOPipeRequestBasic
[+] IPipeBackend hooks for on start/end hooks of IOPipeRequestBasics
[*] Update IOAdapaterAsyncStream implementation to better support caller buffering
[*] Updated IAsyncStreamReader to include a warm/dequeue API for direct async usage
[*] Fix NT IO regressions
[*] Fix ThreadPool shutdown on an unregistered thread
[*] Fix race condition in Async.NT.cpp & fix signalable state to closely match Linux (dunno how this was passing before)
[*] Refactor IOProcessorWorkUnit -> IIOProcessorWorkUnit
[*] Update experimental header to include the changes
2022-06-21 05:49:36 +01:00

661 lines
18 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Watcher.NT.cpp
Date: 2022-4-1
Author: Reece
Note: FindFirstChangeNotification locks the parent directory, every other library is garbage
Aurora shall implement this sewage instead
***/
#include <Source/RuntimeInternal.hpp>
#include "FS.hpp"
#include <Source/IO/Loop/Loop.hpp>
#include <Source/IO/Loop/LSEvent.hpp>
#include "winioctl.h"
namespace Aurora::IO::FS
{
struct NTWatchObject;
struct NTWatcher;
struct NTWatchhandle
{
NTWatcher *object;
};
struct NTEvent : virtual AuLoop::LSEvent
{
NTEvent(AuSPtr<NTWatchhandle> parent);
bool IsSignaled() override;
Loop::ELoopSource GetType() override;
void OnPresleep() override;
bool OnTrigger(AuUInt handle) override;
private:
AuWPtr<NTWatchhandle> parent_;
};
struct NTWatchObject
{
OVERLAPPED ntOverlapped {};
bool bBroken {};
bool bReschedule {};
NTWatcher *parent;
AuString strBaseDir;
HANDLE hFileHandle {INVALID_HANDLE_VALUE};
AuUInt32 dwReferences {};
~NTWatchObject()
{
Cancel();
}
bool Init(const AuString &usrStr);
bool ScheduleOnce();
bool CheckBroken();
void Cancel();
private:
REQUEST_OPLOCK_OUTPUT_BUFFER whoAsked_;
};
struct NTCachedPath
{
AuString strNormalizedPath;
AuString strTheCakeIsALie;
AuSPtr<UserWatchData> userData;
AuSPtr<NTWatchObject> watcher;
bool bIsDirectory {};
AuList<AuTuple<AuString, bool, AuUInt64>> lastTick;
FILETIME lastFileTime {};
AuList<AuTuple<AuString, bool, AuUInt64>> GetCurrentState();
bool CheckDirDelta(NTWatcher *parent);
void Warm();
bool CheckRun(NTWatcher *parent);
bool AddEvent(NTWatcher *parent, EWatchEvent type, const AuString &path);
};
struct NTWatcher : IWatcher
{
virtual bool AddWatch(const WatchRequest &file) override;
virtual bool RemoveByName(const AuString &path) override;
virtual bool RemoveByPrivateContext(const AuSPtr<UserWatchData> &file) override;
virtual AuSPtr<AuLoop::ILoopSource> AsLoopSource() override;
virtual AuList<WatchEvent> QueryUpdates() override;
bool Init();
bool GoBrr();
private:
// we don't expect to yield to another thread to hit our vector
// these apis are generally expected to be called from a single thread
AuThreadPrimitives::SpinLock spinlock_;
AuBST<AuString, AuWPtr<NTWatchObject>> cachedWatchers_;
public:
AuList<WatchEvent> triggered_;
AuSPtr<NTEvent> ntEvent_;
AuList<NTCachedPath> paths_;
private:
AuSPtr<NTWatchhandle> watchHandler_;
};
// NT Event class
NTEvent::NTEvent(AuSPtr<NTWatchhandle> parent) : LSEvent(false, true /*[1]*/, true), parent_(parent)
{
// [1] Functions such as GetOverlappedResult and the synchronization wait functions reset auto-reset events to the nonsignaled state. Therefore, you should use a manual reset event; if you use an auto-reset event, your application can stop responding if you wait for the operation to complete and then call GetOverlappedResult with the bWait parameter set to TRUE.
// - https://docs.microsoft.com/en-us/windows/win32/api/minwinbase/ns-minwinbase-overlapped
}
bool NTEvent::IsSignaled()
{
return LSEvent::IsSignaled();
}
Loop::ELoopSource NTEvent::GetType()
{
return Loop::ELoopSource::eSourceFileWatcher;
}
// Event type is latching, sort of like binary semaphore
// Resignal if work is still available
void NTEvent::OnPresleep()
{
if (auto watcher = parent_.lock())
{
if (watcher->object->triggered_.size())
{
Set();
}
}
}
// Filter the latching events signal state once
// based on the availability of work
// ...
/// also, this is kind of like a tick
bool NTEvent::OnTrigger(AuUInt handle)
{
bool ret {};
if (auto watcher = parent_.lock())
{
ret = watcher->object->GoBrr();
}
else
{
ret = true;
}
return ret;
}
// NT Cached Path / File Watch Item class
void NTCachedPath::Warm()
{
HANDLE hFile;
hFile = CreateFileW(AuLocale::ConvertFromUTF8(this->strNormalizedPath).c_str(),
GENERIC_READ,
FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL | (this->bIsDirectory ? FILE_FLAG_BACKUP_SEMANTICS : 0),
NULL);
if (hFile != INVALID_HANDLE_VALUE)
{
GetFileTime(hFile, NULL, NULL, &this->lastFileTime);
CloseHandle(hFile);
}
if (this->bIsDirectory)
{
this->lastTick = GetCurrentState();
}
}
AuList<AuTuple<AuString, bool, AuUInt64>> NTCachedPath::GetCurrentState()
{
WIN32_FIND_DATAW ffd;
HANDLE hFind {INVALID_HANDLE_VALUE};
AuList<AuTuple<AuString, bool, AuUInt64>> ret;
ret.reserve(this->lastTick.size());
hFind = FindFirstFileW(AuLocale::ConvertFromUTF8(this->strNormalizedPath + "\\*").c_str(), &ffd);
if (INVALID_HANDLE_VALUE == hFind)
{
SysPushErrorIO();
return this->lastTick;
}
do
{
if (ffd.cFileName == std::wstring(L".") ||
ffd.cFileName == std::wstring(L".."))
{
continue;
}
AuTryInsert(ret, AuMakeTuple(AuLocale::ConvertFromWChar(ffd.cFileName), ffd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY, AuUInt64(ffd.ftLastWriteTime.dwHighDateTime) << AuUInt64(32) | AuUInt64(ffd.ftLastWriteTime.dwLowDateTime)));
}
while (FindNextFileW(hFind, &ffd) != 0);
FindClose(hFind);
return ret;
}
bool NTCachedPath::CheckDirDelta(NTWatcher *parent)
{
bool bDidAThing {false};
auto next = GetCurrentState();
auto currentState = next;
auto oldState = AuExchange(this->lastTick, AuMove(next));
for (auto &old : oldState)
{
if (!AuExists(currentState, old))
{
// FILE WAS DELETED
bDidAThing |= AddEvent(parent, EWatchEvent::eFileDelete, this->strTheCakeIsALie + "\\" + AuGet<0>(old));
}
}
for (auto &newItem : currentState)
{
auto oldItem = AuTryFindByTupleN<0>(oldState, AuGet<0>(newItem));
if (oldItem == oldState.end())
{
// FILE WAS ADDED
bDidAThing |= AddEvent(parent, EWatchEvent::eFileCreate, this->strTheCakeIsALie + "\\" + AuGet<0>(newItem));
}
else if (AuGet<2>(newItem) != AuGet<2>(*oldItem))
{
// FILE WAS MODIFIED
bDidAThing |= AddEvent(parent, EWatchEvent::eFileModify, this->strTheCakeIsALie + "\\" + AuGet<0>(newItem));
}
}
return bDidAThing;
}
bool NTCachedPath::AddEvent(NTWatcher *parent, EWatchEvent type, const AuString &path)
{
AuCtorCode_t code;
auto watchedFile = AuTryConstruct<WatchedFile>(code, this->userData, this->strTheCakeIsALie);
if (!code)
{
return false;
}
WatchEvent event;
event.event = type;
event.watch = AuMove(watchedFile);
event.file = AuTryConstruct<AuString>(code, path);
if (!code)
{
return false;
}
if (!AuTryInsert(parent->triggered_, AuMove(event)))
{
return false;
}
return true;
}
bool NTCachedPath::CheckRun(NTWatcher *parent)
{
FILETIME curFileTime {};
HANDLE hFile;
hFile = CreateFileW(AuLocale::ConvertFromUTF8(this->strNormalizedPath).c_str(),
GENERIC_READ,
FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL | (this->bIsDirectory ? FILE_FLAG_BACKUP_SEMANTICS : 0),
NULL);
if (hFile == INVALID_HANDLE_VALUE)
{
if (GetLastError() == ERROR_FILE_NOT_FOUND)
{
// NOTE: we might want to add a check to prevent double dispatch
return AddEvent(parent, EWatchEvent::eSelfDelete, this->strTheCakeIsALie);
}
else
{
return false;
}
}
bool bDirChanged = this->bIsDirectory ? CheckDirDelta(parent) : false;
if (!GetFileTime(hFile, NULL, NULL, &curFileTime))
{
CloseHandle(hFile);
return true;
}
bool ret = !((this->lastFileTime.dwLowDateTime == curFileTime.dwLowDateTime) &&
(this->lastFileTime.dwHighDateTime == curFileTime.dwHighDateTime));
this->lastFileTime = curFileTime;
CloseHandle(hFile);
if (!ret)
{
return bDirChanged;
}
// Send a self modified update once the timestamp changes
return AddEvent(parent,this->bIsDirectory ? EWatchEvent::eSelfModify : EWatchEvent::eFileModify, this->strTheCakeIsALie) || bDirChanged;
}
// Directory Watcher Object
bool NTWatchObject::Init(const AuString &usrStr)
{
AuCtorCode_t code;
this->strBaseDir = AuTryConstruct<AuString>(code, usrStr);
if (!code)
{
return false;
}
auto c = this->strBaseDir[this->strBaseDir.size() - 1];
if ((c == '\\') ||
(c == '/'))
{
this->strBaseDir.pop_back();
}
this->hFileHandle = CreateFileW(AuLocale::ConvertFromUTF8(this->strBaseDir.c_str()).c_str(),
GENERIC_READ,
FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED | FILE_FLAG_BACKUP_SEMANTICS,
NULL);
if (this->hFileHandle == INVALID_HANDLE_VALUE)
{
return false;
}
AuMemset(&this->ntOverlapped, 0, sizeof(this->ntOverlapped));
return ScheduleOnce();
}
// Request OP lock
bool NTWatchObject::ScheduleOnce()
{
bool firstTime = !this->ntOverlapped.hEvent;
this->ntOverlapped.hEvent = (HANDLE)this->parent->ntEvent_->GetHandle();
REQUEST_OPLOCK_INPUT_BUFFER input
{
REQUEST_OPLOCK_CURRENT_VERSION,
sizeof(REQUEST_OPLOCK_INPUT_BUFFER),
OPLOCK_LEVEL_CACHE_READ | OPLOCK_LEVEL_CACHE_HANDLE,
firstTime ? REQUEST_OPLOCK_INPUT_FLAG_REQUEST : REQUEST_OPLOCK_INPUT_FLAG_ACK,
};
DWORD bytesReturned;
if (DeviceIoControl(this->hFileHandle, FSCTL_REQUEST_OPLOCK, &input, sizeof(input), &whoAsked_, sizeof(whoAsked_), &bytesReturned, &this->ntOverlapped))
{
this->CheckBroken();
}
else
{
if (GetLastError() != ERROR_IO_PENDING)
{
SysPushErrorIO();
return false;
}
}
return true;
}
bool NTWatchObject::CheckBroken()
{
DWORD bytesTransferred;
if (this->bBroken || GetOverlappedResult(this->hFileHandle, &this->ntOverlapped, &bytesTransferred, false))
{
bool bSuccess {true};
this->bBroken = true;
bool bAnyTriggered {};
for (auto &filesWatched : this->parent->paths_)
{
if (!AuStartsWith(filesWatched.strNormalizedPath, this->strBaseDir))
{
continue;
}
if (!filesWatched.CheckRun(this->parent))
{
continue;
}
bAnyTriggered = true;
}
if (this->whoAsked_.Flags & REQUEST_OPLOCK_OUTPUT_FLAG_ACK_REQUIRED)
{
this->whoAsked_.Flags = 0;
bSuccess = ScheduleOnce();
}
this->bBroken = false;
return bSuccess;
}
return true;
}
void NTWatchObject::Cancel()
{
CancelIoEx(this->hFileHandle, &this->ntOverlapped);
AuWin32CloseHandle(this->hFileHandle);
}
// NT Watcher - primary interface implementation
bool NTWatcher::AddWatch(const WatchRequest &request)
{
AuCtorCode_t code;
AuSPtr<NTWatchObject> watcher;
NTCachedPath cached;
auto &file = request.watch;
AU_LOCK_GUARD(this->spinlock_);
AuString translated;
if (!AuIOFS::NormalizePath(translated, file.path))
{
translated = file.path;
}
// Create the NT path in the midst of path normalization
cached.strNormalizedPath = AuTryConstruct<AuString>(code, translated);
if (!code)
{
return false;
}
cached.strTheCakeIsALie = AuTryConstruct<AuString>(code, file.path);
if (!code)
{
return false;
}
cached.userData = file.userData;
// Continue normalizing the parent path
if (AuIOFS::FileExists(translated))
{
AuIOFS::GetDirectoryFromPath(translated, translated);
}
else
{
// dammit, had to move checkrun
cached.bIsDirectory = true;
}
// Update the last edited timestamp as a frame of reference for fast compare of
// directory entries upon lock breakage
cached.Warm();
// Attempt to locate a watcher for the directoy
if (!cached.bIsDirectory)
{
for (const auto &[base, wptr] : this->cachedWatchers_)
{
if (AuStartsWith(translated, base))
{
watcher = wptr.lock();
break;
}
}
}
// Create one, if missing
if (!watcher)
{
auto item = AuMakeShared<NTWatchObject>();
if (!item)
{
SysPushErrorMem();
return false;
}
item->parent = this;
if (!item->Init(translated))
{
SysPushErrorGen();
return false;
}
watcher = item;
if (!AuTryInsert(this->cachedWatchers_, AuMove(translated), AuMove(item)))
{
return false;
}
}
cached.watcher = watcher;
// Append path / user private pair alongside the watcher for book keeping
if (!AuTryInsert(this->paths_, AuMove(cached)))
{
return false;
}
// Done
return true;
}
// Post-event trigger processor and filterer
bool NTWatcher::GoBrr()
{
AU_LOCK_GUARD(this->spinlock_);
for (auto &[dir, weakWatcher] : this->cachedWatchers_)
{
if (auto watcher = weakWatcher.lock())
{
if (!watcher->CheckBroken())
{
this->ntEvent_->Set();
}
}
}
return this->triggered_.size();
}
bool NTWatcher::RemoveByName(const AuString &path)
{
AU_LOCK_GUARD(this->spinlock_);
AuString strNormalized;
if (!AuIOFS::NormalizePath(strNormalized, path))
{
return false;
}
return AuRemoveIf(this->paths_, [&](const NTCachedPath &object) -> bool
{
if ((strNormalized == object.strNormalizedPath) ||
(strNormalized.empty() && object.strNormalizedPath == path))
{
return true;
}
return false;
});
}
bool NTWatcher::RemoveByPrivateContext(const AuSPtr<UserWatchData> &file)
{
AU_LOCK_GUARD(this->spinlock_);
return AuRemoveIf(this->paths_, [&](const NTCachedPath &object) -> bool
{
return file == object.userData;
});
}
bool NTWatcher::Init()
{
this->watchHandler_ = AuMakeShared<NTWatchhandle>();
if (!this->watchHandler_)
{
return false;
}
this->watchHandler_->object = this;
this->ntEvent_ = AuMakeShared<NTEvent>(this->watchHandler_);
if (!this->ntEvent_)
{
return false;
}
if (!this->ntEvent_->HasValidHandle())
{
return false;
}
return true;
}
AuSPtr<AuLoop::ILoopSource> NTWatcher::AsLoopSource()
{
return AuStaticCast<AuLoop::ILSEvent>(this->ntEvent_);
}
AuList<WatchEvent> NTWatcher::NTWatcher::QueryUpdates()
{
AsLoopSource()->IsSignaled(); // poke work queue
AU_LOCK_GUARD(this->spinlock_);
return AuExchange(this->triggered_, {});
}
AUKN_SYM IWatcher *NewWatcherNew()
{
auto watcher = _new NTWatcher();
if (!watcher)
{
return {};
}
if (!watcher->Init())
{
delete watcher;
return {};
}
return watcher;
}
AUKN_SYM void NewWatcherRelease(IWatcher *watcher)
{
AuSafeDelete<NTWatcher *>(watcher);
}
}