[+] Always expand directory tree when attempting to copy or move files, just like every other FIO create operation.

[*] ~~Fix improper yield BlockUntilComplete on NewFence~~, kind of, not really. Wasn't really a bug. Might've introduced more overhead. Might revert.
This commit is contained in:
Reece Wilson 2023-01-23 21:18:58 +00:00
parent d780df6ceb
commit d4dfe22c6c
6 changed files with 76 additions and 43 deletions

View File

@ -50,6 +50,11 @@ namespace Aurora::Async
// internal pool interface
bool ThreadPool::WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 timeoutMs)
{
return WaitFor(WorkerPId_t { AuAsync::GetCurrentWorkerPId().pool, unlocker }, primitive, timeoutMs);
}
bool ThreadPool::WaitFor(WorkerPId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 timeoutMs)
{
auto curThread = GetThreadState();
@ -60,7 +65,8 @@ namespace Aurora::Async
bool workerIdMatches = (unlocker.second == curThread->id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1));
if ((unlocker.first == curThread->id.first) && // work group matches
if ((unlocker.first == curThread->id.first) &&
(unlocker.pool.get() == this) && // work group matches
(workerIdMatches)) // well, crap
{
@ -222,7 +228,9 @@ namespace Aurora::Async
bool ThreadPool::Run()
{
bool ranOnce {};
gCurrentPool = AuWeakFromThis();
auto auThread = AuThreads::GetThread();
auto job = GetThreadState();
@ -675,18 +683,20 @@ namespace Aurora::Async
return this->shuttingdown_ || GetThreadState()->exiting;
}
AuSPtr<IWorkItem> ThreadPool::NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking)
AuSPtr<IWorkItem> ThreadPool::NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool bSupportsBlocking)
{
// Error pass-through
if (!task)
{
return {};
}
return AuMakeShared<WorkItem>(this, worker, task, supportsBlocking);
return AuMakeShared<WorkItem>(this, WorkerPId_t { AuAsync::GetCurrentWorkerPId().pool, worker }, task, bSupportsBlocking);
}
AuSPtr<IWorkItem> ThreadPool::NewFence()
{
return AuMakeShared<WorkItem>(this, WorkerId_t{}, AuSPtr<IWorkItemHandler>{}, true);
return AuMakeShared<WorkItem>(this, AuAsync::GetCurrentWorkerPId(), AuSPtr<IWorkItemHandler>{}, true);
}
AuThreads::ThreadShared_t ThreadPool::ResolveHandle(WorkerId_t id)
@ -908,10 +918,9 @@ namespace Aurora::Async
{
AU_LOCK_GUARD(rwlock_->AsWritable());
if (GetCurrentWorkerPId().pool && create)
if (create)
{
SysPushErrorGeneric("TODO (reece): add support for multiple runners per thread");
return {};
gCurrentPool = AuSharedFromThis();
}
AuSPtr<GroupState> group;

View File

@ -28,41 +28,42 @@ namespace Aurora::Async
ThreadPool();
// IThreadPoolInternal
bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) override;
void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable) override;
IThreadPool *ToThreadPool() override;
void IncrementTasksRunning() override;
void DecrementTasksRunning() override;
bool WaitFor(WorkerPId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms);
bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms) override;
void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable) override;
IThreadPool *ToThreadPool() override;
void IncrementTasksRunning() override;
void DecrementTasksRunning() override;
// IThreadPool
virtual bool Spawn(WorkerId_t workerId) override;
virtual bool Spawn(WorkerId_t workerId) override;
virtual void SetRunningMode(bool eventRunning) override;
virtual void SetRunningMode(bool eventRunning) override;
virtual bool Create(WorkerId_t workerId) override;
virtual bool Create(WorkerId_t workerId) override;
virtual bool InRunnerMode() override;
virtual bool InRunnerMode() override;
virtual bool Poll() override;
virtual bool RunOnce() override;
virtual bool Run() override;
virtual void Shutdown() override;
virtual bool Exiting() override;
virtual bool Poll() override;
virtual bool RunOnce() override;
virtual bool Run() override;
virtual void Shutdown() override;
virtual bool Exiting() override;
virtual AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking) override;
virtual AuSPtr<IWorkItem> NewFence() override;
virtual Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) override;
virtual Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) override;
virtual AuBST<ThreadGroup_t, AuList<ThreadId_t>> GetThreads() override;
virtual WorkerId_t GetCurrentThread() override;
virtual AuBST<ThreadGroup_t, AuList<ThreadId_t>> GetThreads() override;
virtual WorkerId_t GetCurrentThread() override;
virtual bool Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal) override;
virtual void Signal(WorkerId_t workerId) override;
virtual AuSPtr<AuLoop::ILoopSource> WorkerToLoopSource(WorkerId_t id) override;
virtual void SyncAllSafe() override;
virtual bool Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal) override;
virtual void Signal(WorkerId_t workerId) override;
virtual AuSPtr<AuLoop::ILoopSource> WorkerToLoopSource(WorkerId_t id) override;
virtual void SyncAllSafe() override;
virtual void AddFeature(WorkerId_t id, AuSPtr<Threading::Threads::IThreadFeature> feature, bool async) override;

View File

@ -13,9 +13,13 @@
namespace Aurora::Async
{
WorkItem::WorkItem(IThreadPoolInternal *owner, const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking) : worker_(worker), task_(task), owner_(owner)
WorkItem::WorkItem(IThreadPoolInternal *owner,
const WorkerPId_t &worker,
const AuSPtr<IWorkItemHandler> &task,
bool bSupportsBlocking) :
worker_(worker), task_(task), owner_(owner)
{
if (supportsBlocking)
if (bSupportsBlocking)
{
this->finishedEvent_ = AuThreadPrimitives::EventUnique(false, true, true);
SysAssert(this->finishedEvent_);
@ -402,7 +406,7 @@ namespace Aurora::Async
return {};
}
return AuMakeShared<WorkItem>(GetWorkerInternal(), worker, task, supportsBlocking);
return AuMakeShared<WorkItem>(GetWorkerInternal(), WorkerPId_t { AuAsync::GetCurrentWorkerPId().pool, worker }, task, supportsBlocking);
}
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerPId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking)
@ -424,7 +428,7 @@ namespace Aurora::Async
AUKN_SYM AuSPtr<IWorkItem> NewFence()
{
return AuMakeShared<WorkItem>(GetWorkerInternal(), WorkerId_t{}, AuSPtr<IWorkItemHandler>{}, true);
return AuMakeShared<WorkItem>(GetWorkerInternal(), AuAsync::GetCurrentWorkerPId(), AuSPtr<IWorkItemHandler>{}, true);
}
void *WorkItem::GetPrivateData()

View File

@ -11,10 +11,15 @@
namespace Aurora::Async
{
class WorkItem : public IWorkItem, public IAsyncRunnable, public AuEnableSharedFromThis<WorkItem>
struct WorkItem :
IWorkItem,
IAsyncRunnable,
AuEnableSharedFromThis<WorkItem>
{
public:
WorkItem(IThreadPoolInternal *owner, const WorkerId_t &worker_, const AuSPtr<IWorkItemHandler> &task_, bool supportsBlocking);
WorkItem(IThreadPoolInternal *owner,
const WorkerPId_t &worker,
const AuSPtr<IWorkItemHandler> &task,
bool bSupportsBlocking);
~WorkItem();
AuSPtr<IWorkItem> WaitFor(const AuSPtr<IWorkItem> &workItem) override;
@ -54,7 +59,7 @@ namespace Aurora::Async
void DispatchExLocked(bool check);
AuSPtr<IWorkItemHandler> task_;
WorkerId_t worker_;
WorkerPId_t worker_;
float prio_ = 0.5f;
AuList<AuSPtr<IWorkItem>> waitOn_;
AuList<AuSPtr<IWorkItem>> waiters_;

View File

@ -359,7 +359,12 @@ namespace Aurora::IO::FS
{
try
{
return MoveFileW(Locale::ConvertFromUTF8(NormalizePathRet(src)).c_str(), Locale::ConvertFromUTF8(NormalizePathRet(dest)).c_str());
auto srcPathNormalized = NormalizePathRet(src);
auto destPathNormalized = NormalizePathRet(dest);
CreateDirectories(destPathNormalized, true);
return MoveFileW(Locale::ConvertFromUTF8(srcPathNormalized).c_str(),
Locale::ConvertFromUTF8(destPathNormalized).c_str());
}
catch (...)
{
@ -371,7 +376,12 @@ namespace Aurora::IO::FS
{
try
{
return CopyFileW(Locale::ConvertFromUTF8(NormalizePathRet(src)).c_str(), Locale::ConvertFromUTF8(NormalizePathRet(dest)).c_str(), true);
auto srcPathNormalized = NormalizePathRet(src);
auto destPathNormalized = NormalizePathRet(dest);
CreateDirectories(destPathNormalized, true);
return CopyFileW(Locale::ConvertFromUTF8(srcPathNormalized).c_str(),
Locale::ConvertFromUTF8(destPathNormalized).c_str(), true);
}
catch (...)
{

View File

@ -319,7 +319,9 @@ namespace Aurora::IO::FS
AUKN_SYM bool Relink(const AuString &src, const AuString &dest)
{
return rename(NormalizePathRet(src).c_str(), NormalizePathRet(dest).c_str()) != -1;
auto normalizedDestPath = NormalizePathRet(dest);
CreateDirectories(destPathNormalized, true);
return rename(NormalizePathRet(src).c_str(), destPathNormalized.c_str()) != -1;
}
#if defined(AURORA_IS_LINUX_DERIVED)
@ -364,6 +366,8 @@ namespace Aurora::IO::FS
auto normalizedSrcPath = NormalizePathRet(src);
auto normalizedDestPath = NormalizePathRet(dest);
CreateDirectories(normalizedDestPath, true);
int input, output;
if ((input = ::open(normalizedSrcPath.c_str(), O_RDONLY | O_CLOEXEC)) == -1)