[+] IAsyncTimer

[+] IAsyncTimerCallback
[+] ETickType.hpp
[+] EWorkPriority.hpp
[+] static IThreadPool::GetSelfIOProcessor()
[+] static IThreadPool::GetSelfIONetInterface()
[+] static IThreadPool::GetSelfIONetWorker()
[-] [Source/Async/]AsyncRunnable.hpp
[*] Begin encapsulating WorkerPId_t
[*] WorkerPId_t no longer take strong pointers to prevent leaks given that these identifiers are copied and kept alive everywhere
This commit is contained in:
Reece Wilson 2023-12-07 09:20:23 +00:00
parent 367118ab0c
commit 8944d8bd16
30 changed files with 636 additions and 234 deletions

View File

@ -7,10 +7,15 @@
***/
#pragma once
#include "EWorkPriority.hpp"
#include "ETickType.hpp"
#include "AsyncTypes.hpp"
#include "IWorkItem.hpp"
#include "IWorkItemHandler.hpp"
#include "IAsyncTimer.hpp"
#include "IAsyncTimerCallback.hpp"
#include "IThreadPool.hpp"
#include "IAsyncApp.hpp"
@ -32,14 +37,18 @@ namespace Aurora::Async
*/
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerPId_t &worker, const AuSPtr<IWorkItemHandler> &task);
/**
* @brief
* @param worker
* @param func
* @return
*/
AUKN_SYM AuSPtr<IWorkItem> NewWorkFunction(const WorkerPId_t &worker, AuVoidFunc func);
AUKN_SYM AuSPtr<IAsyncTimer> NewTimer(AuUInt64 uPeriodNS,
const AuSPtr<IAsyncTimerCallback> &pCallback,
AuOptional<WorkerPId_t> workerPid,
AuOptional<AuUInt64> uStartTime);
AUKN_SYM AuSPtr<IAsyncTimer> NewTimer(AuUInt64 uPeriodNS,
const AuSupplierConsumer<bool, AuUInt64, AuUInt64, AuUInt64> &callback,
AuOptional<WorkerPId_t> workerPid,
AuOptional<AuUInt64> uStartTime);
static inline AuSPtr<IWorkItem> DispatchOn(const WorkerPId_t &worker, AuVoidFunc func)
{
auto pThat = NewWorkFunction(worker, func);

View File

@ -61,38 +61,66 @@ namespace Aurora::Async
}
};
struct WorkPriv
{
AuUInt32 magic;
};
struct WorkerPId_t : WorkerId_t
{
inline WorkerPId_t()
AU_COPY_MOVE_DEF(WorkerPId_t);
inline WorkerPId_t(const AuSPtr<IThreadPool> &pool,
ThreadGroup_t group) :
WorkerId_t(group, kThreadIdAny),
wpPool(pool)
{}
inline WorkerPId_t(const AuSPtr<IThreadPool> &pool, ThreadGroup_t group) : WorkerId_t(group, kThreadIdAny), pool(pool)
inline WorkerPId_t(const AuSPtr<IThreadPool> &pool,
ThreadGroup_t group, ThreadId_t id) :
WorkerId_t(group, id),
wpPool(pool)
{}
inline WorkerPId_t(const AuSPtr<IThreadPool> &pool, ThreadGroup_t group, ThreadId_t id) : WorkerId_t(group, id), pool(pool)
{}
inline WorkerPId_t(const AuSPtr<IThreadPool> &pool, const WorkerId_t &cpy) : WorkerId_t(cpy.first, cpy.second), pool(pool)
inline WorkerPId_t(const AuSPtr<IThreadPool> &pool,
const WorkerId_t &cpy) :
WorkerId_t(cpy.first, cpy.second),
wpPool(pool)
{}
inline WorkerPId_t(ThreadGroup_t group, ThreadId_t id) : WorkerId_t(group, id)
inline WorkerPId_t(ThreadGroup_t group, ThreadId_t id) :
WorkerId_t(group, id)
{ }
AuSPtr<IThreadPool> pool;
inline AuSPtr<IThreadPool> GetPool() const
{
return AuTryLockMemoryType(this->wpPool);
}
inline ThreadGroup_t GetGroup()
{
return this->first;
}
inline ThreadId_t GetRunnerId()
{
return this->second;
}
inline AuUInt HashCode() const noexcept
{
return AuHashCode(AuUInt(this->first) << 20 | AuUInt(this->second)) ^ AuHashCode(pool.get());
auto pPool = GetPool();
return AuHashCode(AuUInt(this->first) << 20 | AuUInt(this->second)) ^ (pPool ? AuHashCode(pPool.get()) : 0);
}
bool operator==(const WorkerPId_t & in) const noexcept
inline bool operator==(const WorkerPId_t &in) const noexcept
{
return in.pool == this->pool && in.first == this->first && this->second == in.second;
return in.GetPool() == this->GetPool() &&
in.first == this->first &&
this->second == in.second;
}
inline operator bool() const noexcept
{
return GetPool();
}
private:
AuWPtr<IThreadPool> wpPool;
};
}

View File

@ -0,0 +1,20 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ETickType.hpp
Date: 2023-12-06
Date: 2021-11-1
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
AUE_DEFINE(ETickType,
(
eFinished,
eRerun,
eSchedule,
eFailed
));
}

View File

@ -0,0 +1,20 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: EWorkPriority.hpp
Date: 2023-12-06
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
AUE_DEFINE(EWorkPriority,
(
ePriorityLowest,
ePriorityLow,
ePriorityNormal,
ePriorityHigh,
ePriorityHighest
));
}

View File

@ -0,0 +1,18 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IAsyncTimer.hpp
Date: 2023-12-06
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
struct IAsyncTimer
{
virtual void CancelTimer() = 0;
virtual AuUInt64 GetLastTime() = 0;
virtual AuUInt64 GetTicks() = 0;
};
}

View File

@ -0,0 +1,15 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IAsyncTimerCallback.hpp
Date: 2023-12-06
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
AUKN_INTERFACE(IAsyncTimerCallback,
AUI_METHOD(bool, OnTick, (AuUInt64, uTick, AuUInt64, uTimeLateNS, AuUInt64, uTimeNowNS))
);
}

View File

@ -510,7 +510,7 @@ namespace Aurora::Async
{
if (!this->notifier_)
{
this->notifier_ = this->currentThread_.pool->NewWorkItem(this->currentThread_, AuSPtr<IWorkItemHandler>(AuSharedFromThis(), &this->tickAdapter));
this->notifier_ = this->currentThread_.GetPool()->NewWorkItem(this->currentThread_, AuSPtr<IWorkItemHandler>(AuSharedFromThis(), &this->tickAdapter));
}
SysAssert(this->notifier_);
@ -522,7 +522,7 @@ namespace Aurora::Async
{
if (!this->startingBarrier_)
{
this->startingBarrier_ = GetCurrentWorkerPId().pool->NewFence();
this->startingBarrier_ = GetCurrentWorkerPId().GetPool()->NewFence();
}
if (!this->startingBarrier_)
@ -539,7 +539,7 @@ namespace Aurora::Async
{
this->currentThread_ = GetCurrentWorkerPId();
SysAssert(worker.pool);
SysAssert(worker);
if (worker == GetCurrentWorkerPId())
{
@ -554,7 +554,8 @@ namespace Aurora::Async
}
else
{
auto newItem = worker.pool->NewWorkItem(worker, AuSPtr<IWorkItemHandler>(AuSharedFromThis(), &this->startupAdapter));
auto newItem = worker.GetPool()->NewWorkItem(worker,
AuSPtr<IWorkItemHandler>(AuSharedFromThis(), &this->startupAdapter));
if (!newItem)
{
return {};

View File

@ -88,6 +88,10 @@ namespace Aurora::Async
virtual AuSPtr<IO::Net::INetInterface> GetIONetInterface(WorkerId_t id) = 0;
virtual AuSPtr<IO::Net::INetWorker> GetIONetWorker(WorkerId_t id) = 0;
static AuSPtr<IO::IIOProcessor> GetSelfIOProcessor();
static AuSPtr<IO::Net::INetInterface> GetSelfIONetInterface();
static AuSPtr<IO::Net::INetWorker> GetSelfIONetWorker();
// Synchronization
// Note: syncing to yourself will nullify requireSignal to prevent deadlock conditions
virtual bool Sync(WorkerId_t workerId,

View File

@ -9,14 +9,6 @@
namespace Aurora::Async
{
AUE_DEFINE(EWorkPrio, (
eLowestPrio,
eLowPrio,
eNormalPrio,
eHighPrio,
eHighestPrio
));
struct IWorkItem
{
virtual AuSPtr<IWorkItem> WaitFor(const AuSPtr<IWorkItem> &pWorkItem) = 0;
@ -50,7 +42,7 @@ namespace Aurora::Async
virtual AuSPtr<IWorkItem> Dispatch() = 0;
virtual void SetPrio(EWorkPrio prio) = 0;
virtual void SetPrio(EWorkPriority prio) = 0;
virtual bool BlockUntilComplete() = 0;
virtual bool HasFinished() = 0;

View File

@ -9,14 +9,6 @@
namespace Aurora::Async
{
AUE_DEFINE(ETickType,
(
eFinished,
eRerun,
eSchedule,
eFailed
));
struct IWorkItemHandler
{
struct ProcessInfo

View File

@ -9,7 +9,6 @@
#include "AuGroupState.hpp"
#include "ThreadState.hpp"
#include "AsyncRunnable.hpp"
namespace Aurora::Async
{

View File

@ -1,81 +0,0 @@
/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AsyncRunnable.hpp
Date: 2021-11-2
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
struct IAsyncRunnable
{
inline virtual EWorkPrio GetPrio()
{
return EWorkPrio::eNormalPrio;
};
virtual void RunAsync() = 0;
virtual void CancelAsync() {}
inline virtual AuOptional<AuPair<AuUInt32, AuUInt32>> QueryFences()
{
return {};
}
};
struct AsyncFuncRunnable : IAsyncRunnable
{
AuFunction<void()> callback;
AuFunction<void()> fail;
AuThreadPrimitives::Mutex lock;
AsyncFuncRunnable(AuFunction<void()> &&callback) : callback(AuMove(callback))
{}
AsyncFuncRunnable(AuFunction<void()> &&callback, AuFunction<void()> &&fail) : callback(AuMove(callback)), fail(AuMove(fail))
{}
AsyncFuncRunnable(const AuFunction<void()> &callback) : callback(callback)
{}
AsyncFuncRunnable(const AuFunction<void()> &callback, const AuFunction<void()> &fail) : callback(callback), fail(fail)
{}
void RunAsync() override
{
AU_LOCK_GUARD(lock);
SysAssertDbgExp(callback, "Missing callback function");
try
{
callback();
}
catch (...)
{
Debug::PrintError();
}
fail = {};
callback = {};
}
void CancelAsync() override
{
AU_LOCK_GUARD(lock);
if (fail)
{
try
{
fail();
}
catch (...)
{
Debug::PrintError();
}
}
fail = {};
callback = {};
}
};
}

View File

@ -0,0 +1,70 @@
/***
Copyright (C) 2021-2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuAsyncFuncRunnable.cpp
Date: 2023-12-06
Date: 2021-11-2
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "IAsyncRunnable.hpp"
#include "AuAsyncFuncRunnable.hpp"
namespace Aurora::Async
{
AsyncFuncRunnable::AsyncFuncRunnable(AuFunction<void()> &&callback) :
callback(AuMove(callback))
{ }
AsyncFuncRunnable::AsyncFuncRunnable(AuFunction<void()> &&callback, AuFunction<void()> &&fail) :
callback(AuMove(callback)), fail(AuMove(fail))
{ }
AsyncFuncRunnable::AsyncFuncRunnable(const AuFunction<void()> &callback) :
callback(callback)
{ }
AsyncFuncRunnable::AsyncFuncRunnable(const AuFunction<void()> &callback, const AuFunction<void()> &fail) :
callback(callback),
fail(fail)
{ }
void AsyncFuncRunnable::RunAsync()
{
AU_LOCK_GUARD(this->lock);
try
{
if (auto callback = AuExchange(this->callback, {}))
{
callback();
}
}
catch (...)
{
SysPushErrorCatch();
this->CancelAsync();
}
this->fail = {};
}
void AsyncFuncRunnable::CancelAsync()
{
AU_LOCK_GUARD(this->lock);
try
{
if (auto callback = AuExchange(this->fail, {}))
{
callback();
}
}
catch (...)
{
SysPushErrorCatch();
}
this->callback = {};
}
}

View File

@ -0,0 +1,27 @@
/***
Copyright (C) 2021-2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuAsyncFuncRunnable.hpp
Date: 2023-12-06
Date: 2021-11-2
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
struct AsyncFuncRunnable : IAsyncRunnable
{
AuFunction<void()> callback;
AuFunction<void()> fail;
AuCriticalSection lock;
AsyncFuncRunnable(AuFunction<void()> &&callback);
AsyncFuncRunnable(AuFunction<void()> &&callback, AuFunction<void()> &&fail);
AsyncFuncRunnable(const AuFunction<void()> &callback);
AsyncFuncRunnable(const AuFunction<void()> &callback, const AuFunction<void()> &fail);
void RunAsync() override;
void CancelAsync() override;
};
}

View File

@ -0,0 +1,62 @@
/***
Copyright (C) 2021-2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuAsyncFuncWorker.cpp
Date: 2023-12-06
Date: 2021-11-2
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "IAsyncRunnable.hpp"
#include "AuAsyncFuncWorker.hpp"
#include "ThreadPool.hpp"
namespace Aurora::Async
{
AsyncFuncWorker::AsyncFuncWorker(IThreadPoolInternal *owner,
const WorkerPId_t &worker,
AuVoidFunc &&func) :
WorkItem(owner, worker, {}),
func(func)
{ }
void AsyncFuncWorker::DispatchTask(IWorkItemHandler::ProcessInfo &info)
{
auto func = AuExchange(this->func, {});
if (!this->CheckAlive())
{
return;
}
if (func)
{
func();
}
}
void AsyncFuncWorker::Cleanup()
{
AuResetMember(this->func);
}
AUKN_SYM AuSPtr<IWorkItem> NewWorkFunction(const WorkerPId_t &worker,
AuVoidFunc func)
{
AU_DEBUG_MEMCRUNCH;
if (!func)
{
SysPushErrorArg("WorkItem has null function");
return {};
}
auto pWorker = worker.GetPool();
if (!pWorker)
{
pWorker = GetCurrentWorkerPId().GetPool();
}
return AuMakeSharedThrow<AsyncFuncWorker>(AuStaticPointerCast<ThreadPool>(pWorker).get(), worker, AuMove(func));
}
}

View File

@ -0,0 +1,27 @@
/***
Copyright (C) 2021-2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuAsyncFuncWorker.hpp
Date: 2023-12-06
Date: 2021-11-2
Author: Reece
***/
#pragma once
#include "WorkItem.hpp"
namespace Aurora::Async
{
struct AsyncFuncWorker : WorkItem
{
AsyncFuncWorker(IThreadPoolInternal *owner,
const WorkerPId_t &worker,
AuVoidFunc &&func);
void DispatchTask(IWorkItemHandler::ProcessInfo &info) override;
void Cleanup() override;
private:
AuVoidFunc func;
};
}

View File

@ -0,0 +1,121 @@
/***
Copyright (C) 2021-2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuAsyncTimer.cpp
Date: 2023-12-06
Date: 2021-11-2
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "IAsyncRunnable.hpp"
#include "AuAsyncTimer.hpp"
#include "ThreadPool.hpp"
namespace Aurora::Async
{
AsyncFuncTimer::AsyncFuncTimer(IThreadPoolInternal *owner,
const AuSPtr<IAsyncTimerCallback> &pCallback,
const WorkerPId_t &worker,
AuUInt64 uNextTickTime,
AuUInt64 uInterval) :
WorkItem(owner, worker, {}),
uNextTickTime(uNextTickTime),
uInterval(uInterval)
{
}
void AsyncFuncTimer::CancelTimer()
{
this->Cancel();
}
AuUInt64 AsyncFuncTimer::GetLastTime()
{
return this->uLastTickTime;
}
AuUInt64 AsyncFuncTimer::GetTicks()
{
return this->uTickCount;
}
void AsyncFuncTimer::DispatchTask(IWorkItemHandler::ProcessInfo &info)
{
info.type = ETickType::eRerun;
auto uTickCount = ++this->uTickCount;
this->uLastTickTime = AuTime::SteadyClockNS();
auto uDelta = this->uLastTickTime - this->uNextTickTime;
this->uNextTickTime += this->uInterval;
this->uNextTickTime = AuMax(this->uNextTickTime, this->uLastTickTime);
info.reschedSteadyClockAbsNs = this->uNextTickTime;
if (this->pCallback->OnTick(uTickCount, uDelta, this->uLastTickTime))
{
info.type = ETickType::eSchedule;
}
else
{
info.type = ETickType::eFinished;
}
}
void AsyncFuncTimer::Cleanup()
{
AuResetMember(this->pCallback);
}
AUKN_SYM AuSPtr<IAsyncTimer> NewTimer(AuUInt64 uPeriodNS,
const AuSPtr<IAsyncTimerCallback> &pCallback,
AuOptional<WorkerPId_t> workerPid,
AuOptional<AuUInt64> uStartTime)
{
if (!pCallback)
{
SysPushErrorArg();
return {};
}
if (!workerPid)
{
workerPid = AuAsync::GetCurrentWorkerPId();
}
if (uPeriodNS < 100)
{
SysPushErrorArg();
return {};
}
if (!uStartTime)
{
uStartTime = AuTime::SteadyClockNS() + uPeriodNS;
}
auto pThreadPool = AuStaticPointerCast<ThreadPool>(workerPid.value().GetPool()).get();
auto pRet = AuMakeShared<AsyncFuncTimer>(pThreadPool,
pCallback,
workerPid.value(),
uStartTime.value(),
uPeriodNS);
if (!pRet)
{
return {};
}
pRet->Dispatch();
return pRet;
}
AUKN_SYM AuSPtr<IAsyncTimer> NewTimer(AuUInt64 uPeriodNS,
const AuSupplierConsumer<bool, AuUInt64, AuUInt64, AuUInt64> &callback,
AuOptional<WorkerPId_t> workerPid,
AuOptional<AuUInt64> uStartTime)
{
return NewTimer(uPeriodNS,
AuMakeSharedThrow<IAsyncTimerCallbackFunctional>(callback),
workerPid,
uStartTime);
}
}

View File

@ -0,0 +1,38 @@
/***
Copyright (C) 2021-2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuAsyncTimer.hpp
Date: 2023-12-06
Author: Reece
***/
#pragma once
#include "WorkItem.hpp"
namespace Aurora::Async
{
struct AsyncFuncTimer :
WorkItem,
IAsyncTimer
{
AsyncFuncTimer(IThreadPoolInternal *owner,
const AuSPtr<IAsyncTimerCallback> &pCallback,
const WorkerPId_t &worker,
AuUInt64 uNextTickTime,
AuUInt64 uInterval);
void CancelTimer() override;
AuUInt64 GetLastTime() override;
AuUInt64 GetTicks() override;
void DispatchTask(IWorkItemHandler::ProcessInfo &info) override;
void Cleanup() override;
private:
AuSPtr<IAsyncTimerCallback> pCallback;
WorkerPId_t worker;
AuUInt64 uNextTickTime {};
AuUInt64 uLastTickTime {};
AuUInt64 uInterval {};
AuUInt64 uTickCount {};
};
}

View File

@ -9,6 +9,7 @@
#include "AuGroupWorkQueue.hpp"
#include "Async.hpp"
#include "ThreadPool.hpp"
#include "IAsyncRunnable.hpp"
namespace Aurora::Async
{
@ -17,7 +18,7 @@ namespace Aurora::Async
AU_DEBUG_MEMCRUNCH;
auto prio = (int)entry.second->GetPrio();
SysAssert(prio < AuAsync::kEWorkPrioCount, "Invalid PRIO");
SysAssert(prio < AuAsync::kEWorkPriorityCount, "Invalid PRIO");
AU_LOCK_GUARD(this->mutex);
this->sortedWork[prio].push_back(entry);
@ -34,7 +35,7 @@ namespace Aurora::Async
bool GroupWorkQueue::IsEmpty()
{
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
for (AU_ITERATE_N(i, AuAsync::kEWorkPriorityCount))
{
if (this->sortedWork[i].size())
{
@ -58,7 +59,7 @@ namespace Aurora::Async
#else
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
for (AU_ITERATE_N(i, AuAsync::kEWorkPriorityCount))
{
for (const auto &[srcId, pA] : this->sortedWork[i])
{
@ -77,9 +78,9 @@ namespace Aurora::Async
{
AU_LOCK_GUARD(this->mutex);
for (AU_ITERATE_N(i, AuAsync::kEWorkPrioCount))
for (AU_ITERATE_N(i, AuAsync::kEWorkPriorityCount))
{
auto &group = this->sortedWork[(int)AuAsync::kEWorkPrioMaxLegal - i];
auto &group = this->sortedWork[(int)AuAsync::kEWorkPriorityMaxLegal - i];
for (auto itr = group.begin(); ((itr != group.end()) && (queue.size() < maxPopCount)); )
{

View File

@ -17,9 +17,9 @@ namespace Aurora::Async
struct GroupWorkQueue
{
AuThreadPrimitives::Mutex mutex;
AuMutex mutex;
AuUInt32 uItems {};
AuList<WorkEntry_t> sortedWork[AuAsync::kEWorkPrioCount];
AuList<WorkEntry_t> sortedWork[AuAsync::kEWorkPriorityCount];
bool IsEmpty();
bool IsEmpty(ThreadPool *pPool, AuWorkerId_t id);

View File

@ -11,6 +11,7 @@
//#include "AsyncApp.hpp"
#include "ThreadPool.hpp"
#include <Console/Commands/Commands.hpp>
#include "IAsyncRunnable.hpp"
namespace Aurora::Async
{
@ -190,7 +191,7 @@ namespace Aurora::Async
AUKN_SYM void SetMainThreadForSysPumpScheduling(AuWorkerPId_t pid)
{
if (!pid.pool)
if (!pid)
{
gRuntimeConfig.async.bEnableLegacyTicks = gBOriginal;
AuResetMember(gMainThread);

View File

@ -0,0 +1,30 @@
/***
Copyright (C) 2021-2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IAsyncRunnable.hpp
Date: 2023-12-06
Date: 2021-11-2
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
struct IAsyncRunnable
{
virtual void RunAsync() = 0;
inline virtual EWorkPriority GetPrio()
{
return EWorkPriority::ePriorityNormal;
};
virtual void CancelAsync()
{ }
inline virtual AuOptional<AuPair<AuUInt32, AuUInt32>> QueryFences()
{
return {};
}
};
}

View File

@ -12,6 +12,9 @@
#include "WorkItem.hpp"
#include "AuSchedular.hpp"
#include "ThreadWorkerQueueShim.hpp"
#include "IAsyncRunnable.hpp"
#include "AuAsyncFuncRunnable.hpp"
#include "AuAsyncFuncWorker.hpp"
namespace Aurora::Async
{
@ -58,7 +61,7 @@ namespace Aurora::Async
bool ThreadPool::WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 timeoutMs)
{
return WaitFor(WorkerPId_t { AuAsync::GetCurrentWorkerPId().pool, unlocker }, primitive, timeoutMs);
return WaitFor(WorkerPId_t { AuAsync::GetCurrentWorkerPId().GetPool(), unlocker }, primitive, timeoutMs);
}
bool ThreadPool::WaitFor(WorkerPId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 timeoutMs)
@ -70,7 +73,7 @@ namespace Aurora::Async
(GetThreadWorkersCount(unlocker.first) == 1));
if ((unlocker.first == pCurThread->thread.id.first) &&
(unlocker.pool.get() == this) &&
(unlocker.GetPool().get() == this) &&
(bWorkerIdMatches))
{
@ -92,12 +95,12 @@ namespace Aurora::Async
{
AuSPtr<ThreadState> pHandle;
if (auto pPool = unlocker.pool)
if (auto pPool = unlocker)
{
auto pPoolEx = AuStaticCast<ThreadPool>(unlocker.pool);
auto pPoolEx = AuStaticCast<ThreadPool>(unlocker.GetPool());
AU_LOCK_GUARD(pPoolEx->rwlock_->AsReadable());
if ((pHandle = AuStaticCast<ThreadPool>(unlocker.pool)->GetThreadHandle(unlocker)))
if ((pHandle = AuStaticCast<ThreadPool>(unlocker.GetPool())->GetThreadHandle(unlocker)))
{
AU_LOCK_GUARD(pHandle->externalFencesLock);
if (pHandle->exitingflag2)
@ -109,7 +112,7 @@ namespace Aurora::Async
pHandle->externalFences.push_back(primitive.get());
}
}
else if (unlocker.pool.get() == this)
else if (unlocker.GetPool().get() == this)
{
return primitive->LockMS(timeoutMs);
}
@ -688,7 +691,7 @@ namespace Aurora::Async
{
SysAssert(callback);
return AuMakeShared<FuncWorker>(this, WorkerPId_t { this->SharedFromThis(), worker }, AuMove(callback));
return AuMakeShared<AsyncFuncWorker>(this, WorkerPId_t { this->SharedFromThis(), worker }, AuMove(callback));
}
AuSPtr<IWorkItem> ThreadPool::NewFence()
@ -769,6 +772,45 @@ namespace Aurora::Async
return {};
}
AuSPtr<IO::IIOProcessor> ThreadPool::GetSelfIOProcessor()
{
auto pid = GetCurrentWorkerPId();
if (pid)
{
return pid.GetPool()->GetIOProcessor(pid);
}
else
{
return {};
}
}
AuSPtr<IO::Net::INetInterface> ThreadPool::GetSelfIONetInterface()
{
auto pid = GetCurrentWorkerPId();
if (pid)
{
return pid.GetPool()->GetIONetInterface(pid);
}
else
{
return {};
}
}
AuSPtr<IO::Net::INetWorker> ThreadPool::GetSelfIONetWorker()
{
auto pid = GetCurrentWorkerPId();
if (pid)
{
return pid.GetPool()->GetIONetWorker(pid);
}
else
{
return {};
}
}
bool ThreadPool::Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal)
{
AU_LOCK_GUARD(this->pRWReadView);
@ -1105,9 +1147,9 @@ namespace Aurora::Async
}, []() -> void
{
auto pid = GetCurrentWorkerPId();
if (pid.pool)
if (pid)
{
GetWorkerInternal(pid.pool)->ThisExiting();
GetWorkerInternal(pid.GetPool())->ThisExiting();
}
}));

View File

@ -64,6 +64,10 @@ namespace Aurora::Async
virtual AuSPtr<AuIO::Net::INetInterface> GetIONetInterface(WorkerId_t pid) override;
virtual AuSPtr<AuIO::Net::INetWorker> GetIONetWorker(WorkerId_t id) override;
AuSPtr<IO::IIOProcessor> GetSelfIOProcessor();
AuSPtr<IO::Net::INetInterface> GetSelfIONetInterface();
AuSPtr<IO::Net::INetWorker> GetSelfIONetWorker();
virtual bool Sync(WorkerId_t workerId, AuUInt32 timeoutMs, bool requireSignal) override;
virtual void Signal(WorkerId_t workerId) override;
virtual AuSPtr<AuLoop::ILoopSource> WorkerToLoopSource(WorkerId_t id) override;
@ -114,7 +118,9 @@ namespace Aurora::Async
WorkerWPId_t()
{}
WorkerWPId_t(const WorkerPId_t &ref) : WorkerId_t(ref.first, ref.second), pool(ref.pool)
WorkerWPId_t(const WorkerPId_t &ref) :
WorkerId_t(ref.first, ref.second),
pool(ref.GetPool())
{}
AuWPtr<IThreadPool> pool;

View File

@ -7,7 +7,7 @@
***/
#pragma once
#include "AsyncRunnable.hpp"
#include "IThreadPoolInternal.hpp"
#include "AuThreadState.hpp"
#include "AuGroupWorkQueue.hpp"

View File

@ -10,6 +10,7 @@
#include "WorkItem.hpp"
#include "AsyncApp.hpp"
#include "AuSchedular.hpp"
#include "ThreadPool.hpp"
#if defined(AURORA_COMPILER_CLANG)
// warning: enumeration values 'kEnumCount' not handled in switch [-Wswitch
@ -19,14 +20,6 @@
namespace Aurora::Async
{
FuncWorker::FuncWorker(IThreadPoolInternal *owner,
const WorkerPId_t &worker,
AuVoidFunc &&func) :
WorkItem(owner, worker, {}),
func(func)
{
}
WorkItem::WorkItem(IThreadPoolInternal *owner,
const WorkerPId_t &worker,
const AuSPtr<IWorkItemHandler> &task) :
@ -278,19 +271,25 @@ namespace Aurora::Async
SendOff();
}
EWorkPrio WorkItem::GetPrio()
EWorkPriority WorkItem::GetPrio()
{
return this->prio_;
}
void WorkItem::SetPrio(EWorkPrio prio)
void WorkItem::SetPrio(EWorkPriority prio)
{
if (!EWorkPriorityIsValid(prio))
{
SysPushErrorArg();
return;
}
this->prio_ = prio;
}
void WorkItem::CancelAsync()
{
AU_TRY_LOCK_GUARD_NAMED(this->lock2, asd);
AU_LOCK_GUARD(this->lock2);
this->Fail();
}
@ -346,6 +345,11 @@ namespace Aurora::Async
}
}
void WorkItem::Cleanup()
{
}
AuSPtr<ThreadState> WorkItem::GetState()
{
if (this->worker_.HasValue())
@ -467,6 +471,8 @@ namespace Aurora::Async
pIOWatch->StopWatch();
}
this->Cleanup();
AuResetMember(this->pIOWatchLS);
}
@ -493,6 +499,8 @@ namespace Aurora::Async
this->waiters_.clear();
this->waitOn_.clear();
this->Cleanup();
if (this->finishedEvent_)
{
@ -633,27 +641,22 @@ namespace Aurora::Async
this->owner_->Run(this->worker_.value(), AuSharedFromThis());
}
}
void *WorkItem::GetPrivateData()
{
if (!this->task_)
{
return nullptr;
}
return this->task_->GetPrivateData();
}
inline auto ToInternal(const AuSPtr<IThreadPool> &pool)
{
return AuStaticPointerCast<ThreadPool>(pool);
}
void FuncWorker::DispatchTask(IWorkItemHandler::ProcessInfo &info)
{
auto func = AuExchange(this->func, {});
if (!this->CheckAlive())
{
return;
}
if (func)
{
func();
}
}
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task)
{
AU_DEBUG_MEMCRUNCH;
@ -664,7 +667,7 @@ namespace Aurora::Async
return {};
}
auto pWorker = GetCurrentWorkerPId().pool;
auto pWorker = GetCurrentWorkerPId().GetPool();
if (!pWorker)
{
pWorker = AuUnsafeRaiiToShared(static_cast<IAsyncApp *>(gAsyncApp));
@ -673,30 +676,6 @@ namespace Aurora::Async
return AuMakeShared<WorkItem>(ToInternal(pWorker).get(), WorkerPId_t { pWorker , worker }, task);
}
AUKN_SYM AuSPtr<IWorkItem> NewWorkFunction(const WorkerPId_t &worker, AuVoidFunc func)
{
AU_DEBUG_MEMCRUNCH;
if (!func)
{
SysPushErrorArg("WorkItem has null function");
return {};
}
auto pWorker = worker.pool;
if (!pWorker)
{
pWorker = GetCurrentWorkerPId().pool;
}
if (!pWorker)
{
pWorker = AuUnsafeRaiiToShared(static_cast<IAsyncApp *>(gAsyncApp));
}
return AuMakeSharedThrow<FuncWorker>(ToInternal(pWorker).get(), worker, AuMove(func));
}
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerPId_t &worker, const AuSPtr<IWorkItemHandler> &task)
{
AU_DEBUG_MEMCRUNCH;
@ -707,10 +686,10 @@ namespace Aurora::Async
return {};
}
auto pWorker = worker.pool;
auto pWorker = worker.GetPool();
if (!pWorker)
{
pWorker = GetCurrentWorkerPId().pool;
pWorker = GetCurrentWorkerPId().GetPool();
}
if (!pWorker)
@ -723,21 +702,11 @@ namespace Aurora::Async
AUKN_SYM AuSPtr<IWorkItem> NewFence()
{
auto pWorker = GetCurrentWorkerPId().pool;
auto pWorker = GetCurrentWorkerPId().GetPool();
if (!pWorker)
{
pWorker = AuUnsafeRaiiToShared(static_cast<IAsyncApp *>(gAsyncApp));
}
return AuMakeShared<WorkItem>((IThreadPoolInternal *)ToInternal(pWorker).get(), WorkerPId_t {}, AuSPtr<IWorkItemHandler>{});
}
void *WorkItem::GetPrivateData()
{
if (!this->task_)
{
return nullptr;
}
return this->task_->GetPrivateData();
}
}

View File

@ -8,6 +8,7 @@
#pragma once
#include "ThreadPool.hpp"
#include "IAsyncRunnable.hpp"
namespace Aurora::Async
{
@ -49,8 +50,8 @@ namespace Aurora::Async
void *GetPrivateData() override;
EWorkPrio GetPrio() override;
void SetPrio(EWorkPrio prio) override;
EWorkPriority GetPrio() override;
void SetPrio(EWorkPriority prio) override;
AuOptional<AuPair<AuUInt32, AuUInt32>> QueryFences() override;
@ -67,13 +68,14 @@ namespace Aurora::Async
void DispatchExLocked(bool check);
virtual void DispatchTask(IWorkItemHandler::ProcessInfo &info);
virtual void Cleanup();
IThreadPoolInternal *owner_ {};
AuSPtr<ThreadState> GetState();
AuSPtr<IWorkItemHandler> task_;
AuOptionalEx<WorkerPId_t> worker_;
EWorkPrio prio_ = EWorkPrio::eNormalPrio;
EWorkPriority prio_ = EWorkPriority::ePriorityNormal;
AuList<AuSPtr<IWorkItem>> waitOn_;
AuList<AuSPtr<IWorkItem>> waiters_;
AuThreadPrimitives::CriticalSection lock;
@ -82,7 +84,7 @@ namespace Aurora::Async
AuUInt32 uShutdownCookie {};
AuOptionalEx<AuUInt32> optOtherCookie {};
AuSPtr<AuIO::IIOProcessorItem> pIOWatch;
AuSPtr<IO::Loop::ILoopSource> pIOWatchLS;
AuSPtr<AuIO::Loop::ILoopSource> pIOWatchLS;
bool finished {};
bool failed {};
@ -95,15 +97,4 @@ namespace Aurora::Async
void SendOff();
};
struct FuncWorker : WorkItem
{
FuncWorker(IThreadPoolInternal *owner,
const WorkerPId_t &worker,
AuVoidFunc &&func);
void DispatchTask(IWorkItemHandler::ProcessInfo &info) override;
private:
AuVoidFunc func;
};
}

View File

@ -89,7 +89,7 @@ namespace Aurora::IO
if (asyncWorker)
{
AuAtomicAdd(&AuStaticCast<AuAsync::ThreadPool>(asyncWorker.value().pool)->uAtomicIOProcessors, 1u);
AuAtomicAdd(&AuStaticCast<AuAsync::ThreadPool>(asyncWorker.value().GetPool())->uAtomicIOProcessors, 1u);
}
return true;
@ -649,7 +649,7 @@ namespace Aurora::IO
{
if (auto optWorker = this->asyncWorker)
{
AuAtomicAdd(&AuStaticCast<AuAsync::ThreadPool>(optWorker.value().pool)->uAtomicIOProcessorsWorthlessSources, 1u);
AuAtomicAdd(&AuStaticCast<AuAsync::ThreadPool>(optWorker.value().GetPool())->uAtomicIOProcessorsWorthlessSources, 1u);
}
this->ToQueue()->SourceAdd(this->timers.pLsTicker);
@ -661,7 +661,7 @@ namespace Aurora::IO
{
if (!this->pWorkItem)
{
this->pWorkItem = this->asyncWorker.value().pool->NewWorkItem(this->asyncWorker.value(), AuSharedFromThis());
this->pWorkItem = this->asyncWorker.value().GetPool()->NewWorkItem(this->asyncWorker.value(), AuSharedFromThis());
}
this->pWorkItem->SetSchedTimeNs(this->refreshRateNs);
@ -704,7 +704,7 @@ namespace Aurora::IO
if (auto optWorker = this->asyncWorker)
{
AuAtomicSub(&AuStaticCast<AuAsync::ThreadPool>(optWorker.value().pool)->uAtomicIOProcessorsWorthlessSources, 1u);
AuAtomicSub(&AuStaticCast<AuAsync::ThreadPool>(optWorker.value().GetPool())->uAtomicIOProcessorsWorthlessSources, 1u);
}
}
@ -880,7 +880,7 @@ namespace Aurora::IO
{
if (auto optWorker = this->asyncWorker)
{
AuAtomicSub(&AuStaticCast<AuAsync::ThreadPool>(optWorker.value().pool)->uAtomicIOProcessors, 1u);
AuAtomicSub(&AuStaticCast<AuAsync::ThreadPool>(optWorker.value().GetPool())->uAtomicIOProcessors, 1u);
}
RemoveTimer();
@ -968,20 +968,20 @@ namespace Aurora::IO
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessorOnThread(bool tickOnly, Async::WorkerPId_t id)
{
if (!id.pool)
if (!id.GetPool())
{
SysPushErrorArg();
return {};
}
auto thread = id.pool->ResolveHandle(id);
auto thread = id.GetPool()->ResolveHandle(id);
if (!thread)
{
SysPushErrorGeneric("Worker PID failed to resolve to a thread object");
return {};
}
auto queue = id.pool->ToKernelWorkQueue(id);
auto queue = id.GetPool()->ToKernelWorkQueue(id);
if (!queue)
{
SysPushErrorGeneric("Worker PID has no kernel work queue");

View File

@ -44,12 +44,12 @@ namespace Aurora::IO::Loop
AUKN_SYM AuSPtr<ILoopSource> NewLSAsync(Async::WorkerPId_t workerPid)
{
if (!workerPid.pool)
if (!workerPid)
{
return Async::GetAsyncApp()->WorkerToLoopSource(workerPid);
}
return workerPid.pool->WorkerToLoopSource(workerPid);
return workerPid.GetPool()->WorkerToLoopSource(workerPid);
}
#if defined(AURORA_IS_MODERNNT_DERIVED)

View File

@ -105,7 +105,7 @@ namespace Aurora::IO::Net
AuSPtr<Waiter> temp;
auto pid = AuAsync::GetCurrentWorkerPId();
if (pid.pool)
if (pid)
{
temp = AuMakeShared<Waiter>(work, callback, pid);
}