Remove ID 0 restriction from Aurora async, breaking the API
This commit is contained in:
parent
2c5a492f08
commit
3fb8e2bf8c
@ -24,12 +24,12 @@ namespace Aurora::Async
|
|||||||
/// 1+ = user defined
|
/// 1+ = user defined
|
||||||
///
|
///
|
||||||
/// ThreadId_t:
|
/// ThreadId_t:
|
||||||
/// 0 = invalid
|
/// -1 = invalid
|
||||||
/// index+1 = tid/runner id
|
/// index = tid/runner id
|
||||||
///
|
///
|
||||||
using WorkerId_t = AuPair<ThreadGroup_t, ThreadId_t>;
|
using WorkerId_t = AuPair<ThreadGroup_t, ThreadId_t>;
|
||||||
|
|
||||||
using DispatchTarget_t = AuPair<ThreadGroup_t, AuOptional<ThreadId_t>>;
|
static const ThreadId_t kThreadIdAny = -1;
|
||||||
|
|
||||||
struct WorkPriv
|
struct WorkPriv
|
||||||
{
|
{
|
||||||
@ -181,9 +181,17 @@ namespace Aurora::Async
|
|||||||
public:
|
public:
|
||||||
virtual AuSPtr<IWorkItem> WaitFor(const AuSPtr<IWorkItem> &workItem) = 0;
|
virtual AuSPtr<IWorkItem> WaitFor(const AuSPtr<IWorkItem> &workItem) = 0;
|
||||||
virtual AuSPtr<IWorkItem> WaitFor(const AuList<AuSPtr<IWorkItem>> &workItem) = 0;
|
virtual AuSPtr<IWorkItem> WaitFor(const AuList<AuSPtr<IWorkItem>> &workItem) = 0;
|
||||||
|
|
||||||
|
// ms = time relative to the current time
|
||||||
virtual AuSPtr<IWorkItem> SetSchedTime(AuUInt32 ms) = 0;
|
virtual AuSPtr<IWorkItem> SetSchedTime(AuUInt32 ms) = 0;
|
||||||
|
|
||||||
|
// ns = time relative to the current time
|
||||||
virtual AuSPtr<IWorkItem> SetSchedTimeNs(AuUInt64 ns) = 0;
|
virtual AuSPtr<IWorkItem> SetSchedTimeNs(AuUInt64 ns) = 0;
|
||||||
|
|
||||||
|
// ms = time relative to the time at which the work item would otherwise dispatch
|
||||||
virtual AuSPtr<IWorkItem> AddDelayTime(AuUInt32 ms) = 0;
|
virtual AuSPtr<IWorkItem> AddDelayTime(AuUInt32 ms) = 0;
|
||||||
|
|
||||||
|
// ns = time relative to the time at which the work item would otherwise dispatch
|
||||||
virtual AuSPtr<IWorkItem> AddDelayTimeNs(AuUInt64 ns) = 0;
|
virtual AuSPtr<IWorkItem> AddDelayTimeNs(AuUInt64 ns) = 0;
|
||||||
|
|
||||||
virtual AuSPtr<IWorkItem> Then(const AuSPtr<IWorkItem> &next) = 0;
|
virtual AuSPtr<IWorkItem> Then(const AuSPtr<IWorkItem> &next) = 0;
|
||||||
@ -199,7 +207,7 @@ namespace Aurora::Async
|
|||||||
virtual AuOptional<void *> ToWorkResultT() = 0;
|
virtual AuOptional<void *> ToWorkResultT() = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const DispatchTarget_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking = false);
|
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking = false);
|
||||||
|
|
||||||
#pragma region EASE_OF_READING
|
#pragma region EASE_OF_READING
|
||||||
struct BasicWorkStdFunc : IWorkItemHandler
|
struct BasicWorkStdFunc : IWorkItemHandler
|
||||||
@ -530,13 +538,13 @@ namespace Aurora::Async
|
|||||||
#pragma endregion EASE_OF_READING
|
#pragma endregion EASE_OF_READING
|
||||||
|
|
||||||
template<typename Info_t = AVoid, typename Result_t = AVoid, typename Task_t = FTask<Info_t, Result_t>, typename Job_t = FJob<Info_t, Result_t>>
|
template<typename Info_t = AVoid, typename Result_t = AVoid, typename Task_t = FTask<Info_t, Result_t>, typename Job_t = FJob<Info_t, Result_t>>
|
||||||
static AuSPtr<Async::IWorkItem> DispatchBasicWorkCallback(const DispatchTarget_t &worker, const Task_t &task, const Job_t &job, bool enableWait = false)
|
static AuSPtr<Async::IWorkItem> DispatchBasicWorkCallback(const WorkerId_t &worker, const Task_t &task, const Job_t &job, bool enableWait = false)
|
||||||
{
|
{
|
||||||
return Async::NewWorkItem(worker, AuMakeShared<Async::BasicWorkCallback<Info_t, Result_t>>(task, job), enableWait)->Dispatch();
|
return Async::NewWorkItem(worker, AuMakeShared<Async::BasicWorkCallback<Info_t, Result_t>>(task, job), enableWait)->Dispatch();
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename Info_t = AVoid, typename Result_t = AVoid, typename Task_t = FTask<Info_t, Result_t>, typename Job_t = FJob<Info_t, Result_t>>
|
template<typename Info_t = AVoid, typename Result_t = AVoid, typename Task_t = FTask<Info_t, Result_t>, typename Job_t = FJob<Info_t, Result_t>>
|
||||||
static AuSPtr<Async::IWorkItem> DispatchBasicWorkCallback(const DispatchTarget_t &worker, const Task_t &task, const Job_t &job, const Info_t &inputParameters, bool enableWait = false)
|
static AuSPtr<Async::IWorkItem> DispatchBasicWorkCallback(const WorkerId_t &worker, const Task_t &task, const Job_t &job, const Info_t &inputParameters, bool enableWait = false)
|
||||||
{
|
{
|
||||||
return Async::NewWorkItem(worker, AuMakeShared<Async::BasicWorkCallback<Info_t, Result_t>>(task, job, inputParameters), enableWait)->Dispatch();
|
return Async::NewWorkItem(worker, AuMakeShared<Async::BasicWorkCallback<Info_t, Result_t>>(task, job, inputParameters), enableWait)->Dispatch();
|
||||||
}
|
}
|
||||||
@ -559,16 +567,8 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
// Synchronization
|
// Synchronization
|
||||||
// Note: syncing to yourself will nullify requireSignal to prevent deadlock
|
// Note: syncing to yourself will nullify requireSignal to prevent deadlock
|
||||||
virtual bool Sync(ThreadGroup_t group, bool requireSignal = false, AuUInt32 timeout = 0) = 0;
|
virtual bool Sync(WorkerId_t group, AuUInt32 timeoutMs = 0, bool requireSignal = false) = 0;
|
||||||
virtual void Signal(ThreadGroup_t group) = 0;
|
virtual void Signal(WorkerId_t group) = 0;
|
||||||
|
|
||||||
virtual bool WaitFor(WorkerId_t unlocker, Threading::IWaitable *primitive, AuUInt32 ms) = 0; // when unlocker = this, pump event loop
|
|
||||||
virtual bool WaitFor(DispatchTarget_t unlocker, Threading::IWaitable *primitive, AuUInt32 ms) = 0; // when unlocker = this, pump event loop
|
|
||||||
|
|
||||||
|
|
||||||
virtual bool SyncTimeout(ThreadGroup_t group, AuUInt32 ms) = 0;
|
|
||||||
|
|
||||||
|
|
||||||
virtual void SyncAllSafe() = 0;
|
virtual void SyncAllSafe() = 0;
|
||||||
|
|
||||||
// Features
|
// Features
|
||||||
|
@ -52,6 +52,12 @@ static AuSPtr<T> AuUnsafeRaiiToShared(T *in)
|
|||||||
return AuSPtr<T>(in, [](T *){});
|
return AuSPtr<T>(in, [](T *){});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template<typename T, class Z>
|
||||||
|
static AuSPtr<T> AuUnsafeRaiiToShared(const AuUPtr<T, Z> &in)
|
||||||
|
{
|
||||||
|
return AuSPtr<T>(in.get(), [](T *){});
|
||||||
|
}
|
||||||
|
|
||||||
template<typename T, int Z>
|
template<typename T, int Z>
|
||||||
static constexpr int AuArraySize(const T(&array)[Z])
|
static constexpr int AuArraySize(const T(&array)[Z])
|
||||||
{
|
{
|
||||||
|
@ -45,6 +45,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
Threading::Threads::ThreadShared_t threadObject;
|
Threading::Threads::ThreadShared_t threadObject;
|
||||||
|
|
||||||
|
//std::stack<jmp_buf> jmpStack;
|
||||||
AuWPtr<GroupState> parent;
|
AuWPtr<GroupState> parent;
|
||||||
|
|
||||||
Threading::Primitives::SemaphoreUnique_t syncSema;
|
Threading::Primitives::SemaphoreUnique_t syncSema;
|
||||||
@ -112,7 +113,7 @@ namespace Aurora::Async
|
|||||||
auto & semaphore = GetThreadState()->syncSema;
|
auto & semaphore = GetThreadState()->syncSema;
|
||||||
auto unsafeSemaphore = semaphore.get();
|
auto unsafeSemaphore = semaphore.get();
|
||||||
|
|
||||||
auto work = AuMakeShared</*Async::BasicWorkStdFunc*/AsyncFuncRunnable>(([=]()
|
auto work = AuMakeShared<AsyncFuncRunnable>(([=]()
|
||||||
{
|
{
|
||||||
auto state = GetThreadState();
|
auto state = GetThreadState();
|
||||||
|
|
||||||
@ -140,10 +141,10 @@ namespace Aurora::Async
|
|||||||
Run(worker, work);
|
Run(worker, work);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
return WaitFor(worker, semaphore.get(), ms);
|
return WaitFor(worker, AuUnsafeRaiiToShared(semaphore), ms);
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncApp::Run(DispatchTarget_t target, AuSPtr<IAsyncRunnable> runnable)
|
void AsyncApp::Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable)
|
||||||
{
|
{
|
||||||
auto state = GetGroup(target.first);
|
auto state = GetGroup(target.first);
|
||||||
SysAssert(static_cast<bool>(state), "couldn't dispatch a task to an offline group");
|
SysAssert(static_cast<bool>(state), "couldn't dispatch a task to an offline group");
|
||||||
@ -156,12 +157,12 @@ namespace Aurora::Async
|
|||||||
#if defined(STAGING) || defined(DEBUG)
|
#if defined(STAGING) || defined(DEBUG)
|
||||||
AU_LOCK_GUARD(rwlock_->AsReadable());
|
AU_LOCK_GUARD(rwlock_->AsReadable());
|
||||||
|
|
||||||
if (target.second.has_value())
|
if (target.second != Async::kThreadIdAny)
|
||||||
{
|
{
|
||||||
auto itr = state->workers.find(*target.second);
|
auto itr = state->workers.find(target.second);
|
||||||
if ((itr == state->workers.end()) || (itr->second->rejecting))
|
if ((itr == state->workers.end()) || (itr->second->rejecting))
|
||||||
{
|
{
|
||||||
SysPushErrorGen("worker: {}:{} is offline", target.first, target.second.value_or(0));
|
SysPushErrorGen("worker: {}:{} is offline", target.first, target.second);
|
||||||
DecRunningTasks();
|
DecRunningTasks();
|
||||||
throw "Requested job worker is offline";
|
throw "Requested job worker is offline";
|
||||||
}
|
}
|
||||||
@ -191,7 +192,7 @@ namespace Aurora::Async
|
|||||||
state->workQueue.push_back(AuMakePair(target.second, runnable));
|
state->workQueue.push_back(AuMakePair(target.second, runnable));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (target.second.has_value())
|
if (target.second == Async::kThreadIdAny)
|
||||||
{
|
{
|
||||||
// sad :(
|
// sad :(
|
||||||
state->cvVariable->Broadcast();
|
state->cvVariable->Broadcast();
|
||||||
@ -202,6 +203,31 @@ namespace Aurora::Async
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int AsyncApp::CfxPollPush()
|
||||||
|
{
|
||||||
|
// TOOD (Reece): implement a context switching library
|
||||||
|
// Refer to the old implementation of this oin pastebin
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncApp::CtxPollReturn(const AuSPtr<ThreadState> &state, int status, bool hitTask)
|
||||||
|
{
|
||||||
|
// TOOD (Reece): implement a context switching library
|
||||||
|
// Refer to the old implementation of this oin pastebin
|
||||||
|
}
|
||||||
|
|
||||||
|
bool AsyncApp::CtxYield()
|
||||||
|
{
|
||||||
|
bool ranAtLeastOne = false;
|
||||||
|
|
||||||
|
while (this->Poll(false))
|
||||||
|
{
|
||||||
|
ranAtLeastOne = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ranAtLeastOne;
|
||||||
|
}
|
||||||
|
|
||||||
bool AsyncApp::Poll(bool blocking)
|
bool AsyncApp::Poll(bool blocking)
|
||||||
{
|
{
|
||||||
auto state = GetThreadState();
|
auto state = GetThreadState();
|
||||||
@ -209,6 +235,8 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
//state->pendingWorkItems.clear();
|
//state->pendingWorkItems.clear();
|
||||||
|
|
||||||
|
auto magic = CfxPollPush();
|
||||||
|
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(group->cvWorkMutex);
|
AU_LOCK_GUARD(group->cvWorkMutex);
|
||||||
|
|
||||||
@ -272,6 +300,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
if (state->pendingWorkItems.empty())
|
if (state->pendingWorkItems.empty())
|
||||||
{
|
{
|
||||||
|
CtxPollReturn(state, magic, false);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -305,6 +334,9 @@ namespace Aurora::Async
|
|||||||
state->pendingWorkItems.clear();
|
state->pendingWorkItems.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
CtxPollReturn(state, magic, true);
|
||||||
|
|
||||||
if (runningTasks == 0)
|
if (runningTasks == 0)
|
||||||
{
|
{
|
||||||
ShutdownZero();
|
ShutdownZero();
|
||||||
@ -313,67 +345,39 @@ namespace Aurora::Async
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool AsyncApp::WaitFor(WorkerId_t worker, Threading::IWaitable *primitive, AuUInt32 timeoutMs)
|
bool AsyncApp::WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 timeoutMs)
|
||||||
{
|
{
|
||||||
auto curThread = GetThreadState();
|
auto curThread = GetThreadState();
|
||||||
|
|
||||||
if (worker == curThread->id)
|
bool workerIdMatches = (unlocker.second == curThread->id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1));
|
||||||
{
|
|
||||||
// TODO: nest counter or jump out
|
|
||||||
while (!Threading::WaitFor(primitive, 2))
|
|
||||||
{
|
|
||||||
while (this->Poll(false));
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
return Threading::WaitFor(primitive, timeoutMs);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool AsyncApp::WaitFor(DispatchTarget_t unlocker, Threading::IWaitable *primitive, AuUInt32 timeoutMs)
|
|
||||||
{
|
|
||||||
auto curThread = GetThreadState();
|
|
||||||
|
|
||||||
bool workerIdMatches = (!unlocker.second.has_value()) && (unlocker.second.value() == curThread->id.second);
|
|
||||||
|
|
||||||
if ((unlocker.first == curThread->id.first) && // work group matches
|
if ((unlocker.first == curThread->id.first) && // work group matches
|
||||||
((GetThreadWorkersCount(unlocker.first) < 2) || // is there anyone besides us who might deal with this? unlikely fast path
|
(workerIdMatches)) // well, crap
|
||||||
(workerIdMatches))) // well, crap
|
|
||||||
{
|
{
|
||||||
if ((workerIdMatches) &&
|
|
||||||
(unlocker.first != 0)) // UI code is always hacky. dont judge people for nesting tasks within tasks.
|
|
||||||
// if theres a stack overflow problem, the native dev responsable for the sysloop and ui would already know about it
|
|
||||||
{
|
|
||||||
LogWarn("Nested Task: {}:{}. This is not an error, it's just bad practice.", unlocker.first, unlocker.second.value_or(0));
|
|
||||||
SysPushErrorLogicError("[telemetry] Nested Task: {}:{}", unlocker.first, unlocker.second.value_or(0));
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: timeout isn't respected here as well
|
bool queryAsync = false;
|
||||||
while (!Threading::WaitFor(primitive, 2))
|
while (!(queryAsync ? primitive->TryLock() : Threading::WaitFor(primitive.get(), 2)))
|
||||||
{
|
{
|
||||||
while (this->Poll(false));
|
queryAsync = CtxYield();
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
return Threading::WaitFor(primitive, timeoutMs);
|
return Threading::WaitFor(primitive.get(), timeoutMs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncApp::Start()
|
void AsyncApp::Start()
|
||||||
{
|
{
|
||||||
SysAssert(Spawn({0, 1}));
|
SysAssert(Spawn({0, 0}));
|
||||||
StartSched();
|
StartSched();
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncApp::Main()
|
void AsyncApp::Main()
|
||||||
{
|
{
|
||||||
Entrypoint({0, 1});
|
Entrypoint({0, 0});
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncApp::ShutdownZero()
|
void AsyncApp::ShutdownZero()
|
||||||
@ -471,12 +475,6 @@ namespace Aurora::Async
|
|||||||
{
|
{
|
||||||
AU_LOCK_GUARD(rwlock_->AsWritable());
|
AU_LOCK_GUARD(rwlock_->AsWritable());
|
||||||
|
|
||||||
if (workerId.second == 0)
|
|
||||||
{
|
|
||||||
LogWarn("WorkerIds must not start from zero to preserve std::optiona nullability");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
AuSPtr<GroupState> group;
|
AuSPtr<GroupState> group;
|
||||||
|
|
||||||
// Try fetch or allocate group
|
// Try fetch or allocate group
|
||||||
@ -539,8 +537,10 @@ namespace Aurora::Async
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
Threading::Threads::ThreadShared_t AsyncApp::ResolveHandle(WorkerId_t id)
|
|
||||||
|
AuSPtr<ThreadState> AsyncApp::GetThreadHandle(WorkerId_t id)
|
||||||
{
|
{
|
||||||
|
AU_LOCK_GUARD(rwlock_->AsReadable());
|
||||||
auto group = GetGroup(id.first);
|
auto group = GetGroup(id.first);
|
||||||
if (!group)
|
if (!group)
|
||||||
{
|
{
|
||||||
@ -553,7 +553,12 @@ namespace Aurora::Async
|
|||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret->get()->threadObject;
|
return *ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
Threading::Threads::ThreadShared_t AsyncApp::ResolveHandle(WorkerId_t id)
|
||||||
|
{
|
||||||
|
return GetThreadHandle(id)->threadObject;
|
||||||
}
|
}
|
||||||
|
|
||||||
AuBST<ThreadGroup_t, AuList<ThreadId_t>> AsyncApp::GetThreads()
|
AuBST<ThreadGroup_t, AuList<ThreadId_t>> AsyncApp::GetThreads()
|
||||||
@ -582,14 +587,15 @@ namespace Aurora::Async
|
|||||||
return tlsWorkerId;
|
return tlsWorkerId;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool AsyncApp::Sync(ThreadGroup_t groupId, bool requireSignal, AuUInt32 timeoutMs)
|
bool AsyncApp::Sync(WorkerId_t groupId, AuUInt32 timeoutMs, bool requireSignal)
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(rwlock_->AsReadable());
|
AU_LOCK_GUARD(rwlock_->AsReadable());
|
||||||
|
|
||||||
auto group = GetGroup(groupId);
|
auto group = GetGroup(groupId.first);
|
||||||
|
|
||||||
auto currentWorkerId = GetCurrentThread().second;
|
auto currentWorkerId = GetCurrentThread().second;
|
||||||
|
|
||||||
|
if (groupId.second == Async::kThreadIdAny)
|
||||||
|
{
|
||||||
for (auto &jobWorker : group->workers)
|
for (auto &jobWorker : group->workers)
|
||||||
{
|
{
|
||||||
if (!Barrier(jobWorker.second->id, timeoutMs, requireSignal && jobWorker.second->id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
|
if (!Barrier(jobWorker.second->id, timeoutMs, requireSignal && jobWorker.second->id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
|
||||||
@ -597,25 +603,31 @@ namespace Aurora::Async
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return Barrier(groupId, timeoutMs, requireSignal && groupId.second != currentWorkerId, false);
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncApp::Signal(ThreadGroup_t groupId)
|
void AsyncApp::Signal(WorkerId_t groupId)
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(rwlock_->AsReadable());
|
AU_LOCK_GUARD(rwlock_->AsReadable());
|
||||||
|
|
||||||
auto group = GetGroup(groupId);
|
auto group = GetGroup(groupId.first);
|
||||||
|
if (groupId.second == Async::kThreadIdAny)
|
||||||
|
{
|
||||||
for (auto &jobWorker : group->workers)
|
for (auto &jobWorker : group->workers)
|
||||||
{
|
{
|
||||||
jobWorker.second->running->Set();
|
jobWorker.second->running->Set();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else
|
||||||
bool AsyncApp::SyncTimeout(ThreadGroup_t group, AuUInt32 ms)
|
|
||||||
{
|
{
|
||||||
return Sync(group, false, ms);
|
GetThreadHandle(groupId)->running->Set();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncApp::SyncAllSafe()
|
void AsyncApp::SyncAllSafe()
|
||||||
|
@ -36,13 +36,11 @@ namespace Aurora::Async
|
|||||||
WorkerId_t GetCurrentThread() override;
|
WorkerId_t GetCurrentThread() override;
|
||||||
|
|
||||||
// Synchronization
|
// Synchronization
|
||||||
bool Sync(ThreadGroup_t group, bool requireSignal, AuUInt32 timeout) override;
|
bool Sync(WorkerId_t group, AuUInt32 timeoutMs, bool requireSignal) override;
|
||||||
void Signal(ThreadGroup_t group) override;
|
void Signal(WorkerId_t group) override;
|
||||||
|
|
||||||
bool WaitFor(WorkerId_t unlocker, Threading::IWaitable *primitive, AuUInt32 ms) override; // when unlocker = this, pump event loop
|
bool WaitFor(WorkerId_t unlocker, const AuSPtr<Threading::IWaitable> &primitive, AuUInt32 ms); // when unlocker = this, pump event loop
|
||||||
bool WaitFor(DispatchTarget_t unlocker, Threading::IWaitable *primitive, AuUInt32 ms) override; // when unlocker = this, pump event loop
|
//bool WaitFor(DispatchTarget_t unlocker, Threading::IWaitable *primitive, AuUInt32 ms) override; // when unlocker = this, pump event loop
|
||||||
|
|
||||||
bool SyncTimeout(ThreadGroup_t group, AuUInt32 ms) override;
|
|
||||||
|
|
||||||
void SyncAllSafe() override;
|
void SyncAllSafe() override;
|
||||||
|
|
||||||
@ -53,13 +51,19 @@ namespace Aurora::Async
|
|||||||
void AssertInThreadGroup(ThreadGroup_t group) override;
|
void AssertInThreadGroup(ThreadGroup_t group) override;
|
||||||
void AssertWorker(WorkerId_t id) override;
|
void AssertWorker(WorkerId_t id) override;
|
||||||
|
|
||||||
void Run(DispatchTarget_t target, AuSPtr<IAsyncRunnable> runnable);
|
void Run(WorkerId_t target, AuSPtr<IAsyncRunnable> runnable);
|
||||||
|
|
||||||
bool Poll(bool block) override;
|
bool Poll(bool block) override;
|
||||||
|
|
||||||
size_t GetThreadWorkersCount(ThreadGroup_t group);
|
size_t GetThreadWorkersCount(ThreadGroup_t group);
|
||||||
|
|
||||||
|
bool CtxYield();
|
||||||
|
int CfxPollPush();
|
||||||
|
void CtxPollReturn(const AuSPtr<ThreadState> &state, int status, bool hitTask);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
AuSPtr<ThreadState> GetThreadHandle(WorkerId_t id);
|
||||||
|
|
||||||
void ThisExiting();
|
void ThisExiting();
|
||||||
void ShutdownZero();
|
void ShutdownZero();
|
||||||
|
|
||||||
|
@ -15,7 +15,7 @@ namespace Aurora::Async
|
|||||||
struct SchedEntry
|
struct SchedEntry
|
||||||
{
|
{
|
||||||
AuUInt64 ns;
|
AuUInt64 ns;
|
||||||
DispatchTarget_t target;
|
WorkerId_t target;
|
||||||
AuSPtr<IAsyncRunnable> runnable;
|
AuSPtr<IAsyncRunnable> runnable;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -128,14 +128,14 @@ namespace Aurora::Async
|
|||||||
gThread.reset();
|
gThread.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
void Schedule(AuUInt64 ns, DispatchTarget_t target, AuSPtr<IAsyncRunnable> runnable)
|
void Schedule(AuUInt64 ns, WorkerId_t target, AuSPtr<IAsyncRunnable> runnable)
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(gSchedLock);
|
AU_LOCK_GUARD(gSchedLock);
|
||||||
IncRunningTasks();
|
IncRunningTasks();
|
||||||
gEntries.push_back({ns, target, runnable});
|
gEntries.push_back({ns, target, runnable});
|
||||||
}
|
}
|
||||||
|
|
||||||
void TerminateSceduledTasks(DispatchTarget_t target)
|
void TerminateSceduledTasks(WorkerId_t target)
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(gSchedLock);
|
AU_LOCK_GUARD(gSchedLock);
|
||||||
|
|
||||||
|
@ -14,6 +14,6 @@ namespace Aurora::Async
|
|||||||
void StartSched();
|
void StartSched();
|
||||||
void StopSched();
|
void StopSched();
|
||||||
|
|
||||||
void Schedule(AuUInt64 ns, DispatchTarget_t target, AuSPtr<IAsyncRunnable> runnable);
|
void Schedule(AuUInt64 ns, WorkerId_t target, AuSPtr<IAsyncRunnable> runnable);
|
||||||
void TerminateSceduledTasks(DispatchTarget_t target);
|
void TerminateSceduledTasks(WorkerId_t target);
|
||||||
}
|
}
|
@ -13,12 +13,12 @@
|
|||||||
|
|
||||||
namespace Aurora::Async
|
namespace Aurora::Async
|
||||||
{
|
{
|
||||||
WorkItem::WorkItem(const DispatchTarget_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking) : worker_(worker), task_(task)
|
WorkItem::WorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking) : worker_(worker), task_(task)
|
||||||
{
|
{
|
||||||
if (supportsBlocking)
|
if (supportsBlocking)
|
||||||
{
|
{
|
||||||
finishedEvent_ = Threading::Primitives::EventUnique(false, true, true);
|
finishedEvent_ = Threading::Primitives::EventUnique(false, true, true);
|
||||||
SysAssert(finishedEvent_ ? true : false);
|
SysAssert(finishedEvent_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -263,7 +263,7 @@ namespace Aurora::Async
|
|||||||
bool WorkItem::BlockUntilComplete()
|
bool WorkItem::BlockUntilComplete()
|
||||||
{
|
{
|
||||||
if (!finishedEvent_) return false;
|
if (!finishedEvent_) return false;
|
||||||
return Async::GetAsyncApp()->WaitFor(this->worker_, finishedEvent_.get(), 0);
|
return static_cast<AsyncApp *>(GetAsyncApp())->WaitFor(this->worker_, AuUnsafeRaiiToShared(finishedEvent_), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool WorkItem::HasFinished()
|
bool WorkItem::HasFinished()
|
||||||
@ -292,7 +292,7 @@ namespace Aurora::Async
|
|||||||
static_cast<AsyncApp *>(GetAsyncApp())->Run(worker_, this->shared_from_this());
|
static_cast<AsyncApp *>(GetAsyncApp())->Run(worker_, this->shared_from_this());
|
||||||
}
|
}
|
||||||
|
|
||||||
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const DispatchTarget_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking)
|
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking)
|
||||||
{
|
{
|
||||||
if (!task)
|
if (!task)
|
||||||
{
|
{
|
||||||
|
@ -12,7 +12,7 @@ namespace Aurora::Async
|
|||||||
class WorkItem : public IWorkItem, public IAsyncRunnable, public std::enable_shared_from_this<WorkItem>
|
class WorkItem : public IWorkItem, public IAsyncRunnable, public std::enable_shared_from_this<WorkItem>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
WorkItem(const DispatchTarget_t &worker_, const AuSPtr<IWorkItemHandler> &task_, bool supportsBlocking);
|
WorkItem(const WorkerId_t &worker_, const AuSPtr<IWorkItemHandler> &task_, bool supportsBlocking);
|
||||||
~WorkItem();
|
~WorkItem();
|
||||||
|
|
||||||
AuSPtr<IWorkItem> WaitFor(const AuSPtr<IWorkItem> &workItem) override;
|
AuSPtr<IWorkItem> WaitFor(const AuSPtr<IWorkItem> &workItem) override;
|
||||||
@ -41,7 +41,7 @@ namespace Aurora::Async
|
|||||||
private:
|
private:
|
||||||
void DispatchEx(bool check);
|
void DispatchEx(bool check);
|
||||||
AuSPtr<IWorkItemHandler> task_;
|
AuSPtr<IWorkItemHandler> task_;
|
||||||
DispatchTarget_t worker_;
|
WorkerId_t worker_;
|
||||||
AuList<AuSPtr<IWorkItem>> waitOn_;
|
AuList<AuSPtr<IWorkItem>> waitOn_;
|
||||||
AuList<AuSPtr<IWorkItem>> waiters_;
|
AuList<AuSPtr<IWorkItem>> waiters_;
|
||||||
Threading::Primitives::SpinLock lock;
|
Threading::Primitives::SpinLock lock;
|
||||||
|
@ -18,7 +18,7 @@ namespace Aurora::Console::Commands
|
|||||||
static AuList<CommandDispatch> gPendingCommands;
|
static AuList<CommandDispatch> gPendingCommands;
|
||||||
static auto gMutex = Threading::Primitives::MutexUnique();
|
static auto gMutex = Threading::Primitives::MutexUnique();
|
||||||
static auto gPendingCommandsMutex = Threading::Primitives::MutexUnique();
|
static auto gPendingCommandsMutex = Threading::Primitives::MutexUnique();
|
||||||
static AuOptional<Async::DispatchTarget_t> gCommandDispatcher;
|
static AuOptional<Async::WorkerId_t> gCommandDispatcher;
|
||||||
|
|
||||||
struct Command
|
struct Command
|
||||||
{
|
{
|
||||||
@ -91,12 +91,13 @@ namespace Aurora::Console::Commands
|
|||||||
return Dispatch(string);
|
return Dispatch(string);
|
||||||
}
|
}
|
||||||
|
|
||||||
void UpdateDispatcher(AuOptional<Async::DispatchTarget_t> target)
|
void UpdateDispatcher(AuOptional<Async::WorkerId_t> target)
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(gMutex);
|
AU_LOCK_GUARD(gMutex);
|
||||||
AU_LOCK_GUARD(gPendingCommandsMutex);
|
AU_LOCK_GUARD(gPendingCommandsMutex);
|
||||||
|
|
||||||
if ((!target.has_value()) && (gCommandDispatcher == target))
|
// process commands before async app termination
|
||||||
|
if ((!target.has_value()))
|
||||||
{
|
{
|
||||||
auto commands = std::exchange(gPendingCommands, {});
|
auto commands = std::exchange(gPendingCommands, {});
|
||||||
for (const auto &command : commands)
|
for (const auto &command : commands)
|
||||||
@ -124,7 +125,7 @@ namespace Aurora::Console::Commands
|
|||||||
auto commands = std::exchange(gPendingCommands, {});
|
auto commands = std::exchange(gPendingCommands, {});
|
||||||
gPendingCommandsMutex->Unlock();
|
gPendingCommandsMutex->Unlock();
|
||||||
|
|
||||||
if (gCommandDispatcher.value_or(Async::DispatchTarget_t{0, 0}) == Async::DispatchTarget_t{0, 0})
|
if (gCommandDispatcher.value_or(Async::WorkerId_t{}) == Async::WorkerId_t{})
|
||||||
{
|
{
|
||||||
DispatchCommandsFromThis(commands);
|
DispatchCommandsFromThis(commands);
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,6 @@
|
|||||||
|
|
||||||
namespace Aurora::Console::Commands
|
namespace Aurora::Console::Commands
|
||||||
{
|
{
|
||||||
void UpdateDispatcher(AuOptional<Async::DispatchTarget_t> target);
|
void UpdateDispatcher(AuOptional<Async::WorkerId_t> target);
|
||||||
void PumpCommands();
|
void PumpCommands();
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user