Improve workitem api
[+] New waitfor by work dispatcher type
This commit is contained in:
parent
1f8d06cbf0
commit
df301a4101
@ -80,16 +80,16 @@ namespace Aurora::Async
|
||||
class IWorkItem
|
||||
{
|
||||
public:
|
||||
virtual void WaitFor(const AuSPtr<IWorkItem> &workItem) = 0;
|
||||
virtual void WaitFor(const AuList<AuSPtr<IWorkItem>> &workItem) = 0;
|
||||
virtual void SetSchedTime(AuUInt32 ms) = 0;
|
||||
virtual void SetSchedTimeNs(AuUInt64 ns) = 0;
|
||||
virtual AuSPtr<IWorkItem> WaitFor(const AuSPtr<IWorkItem> &workItem) = 0;
|
||||
virtual AuSPtr<IWorkItem> WaitFor(const AuList<AuSPtr<IWorkItem>> &workItem) = 0;
|
||||
virtual AuSPtr<IWorkItem> SetSchedTime(AuUInt32 ms) = 0;
|
||||
virtual AuSPtr<IWorkItem> SetSchedTimeNs(AuUInt64 ns) = 0;
|
||||
|
||||
virtual void Dispatch() = 0;
|
||||
virtual AuSPtr<IWorkItem> Dispatch() = 0;
|
||||
|
||||
virtual bool BlockUntilComplete() = 0;
|
||||
virtual bool HasFinished() = 0;
|
||||
virtual bool HasFailed() = 0;
|
||||
virtual bool BlockUntilComplete() = 0;
|
||||
virtual bool HasFinished() = 0;
|
||||
virtual bool HasFailed() = 0;
|
||||
};
|
||||
|
||||
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const DispatchTarget_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking = false);
|
||||
@ -285,7 +285,7 @@ namespace Aurora::Async
|
||||
}
|
||||
};
|
||||
|
||||
template<typename B, typename... Args>
|
||||
template<typename... Args>
|
||||
static std::function<void(Args...)> TranslateAsyncFunctionToDispatcher(std::function<void(Args...)> func)
|
||||
{
|
||||
auto cur = GetAsyncApp()->GetCurrentThread();
|
||||
@ -340,10 +340,13 @@ namespace Aurora::Async
|
||||
virtual bool Sync(ThreadGroup_t group, bool requireSignal = false, AuUInt32 timeout = 0) = 0;
|
||||
virtual void Signal(ThreadGroup_t group) = 0;
|
||||
|
||||
virtual bool WaitFor(WorkerId_t unlocker, Threading::IWaitable *primitive, int ms) = 0; // when unlocker = this, pump event loop
|
||||
|
||||
virtual bool WaitFor(WorkerId_t unlocker, Threading::IWaitable *primitive, AuUInt32 ms) = 0; // when unlocker = this, pump event loop
|
||||
virtual bool WaitFor(DispatchTarget_t unlocker, Threading::IWaitable *primitive, AuUInt32 ms) = 0; // when unlocker = this, pump event loop
|
||||
|
||||
|
||||
virtual bool SyncTimeout(ThreadGroup_t group, AuUInt32 ms) = 0;
|
||||
|
||||
|
||||
virtual void SyncAllSafe() = 0;
|
||||
|
||||
// Features
|
||||
|
@ -261,12 +261,42 @@ namespace Aurora::Async
|
||||
}
|
||||
|
||||
|
||||
bool AsyncApp::WaitFor(WorkerId_t worker, Aurora::Threading::IWaitable *primitive, int timeoutMs)
|
||||
bool AsyncApp::WaitFor(WorkerId_t worker, Threading::IWaitable *primitive, AuUInt32 timeoutMs)
|
||||
{
|
||||
auto curThread = GetThreadState();
|
||||
|
||||
if (worker == curThread->id)
|
||||
{
|
||||
// TODO: nest counter or jump out
|
||||
while (!Threading::WaitFor(primitive, 2))
|
||||
{
|
||||
while (this->Poll(false));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
return Threading::WaitFor(primitive, timeoutMs);
|
||||
}
|
||||
}
|
||||
|
||||
bool AsyncApp::WaitFor(DispatchTarget_t unlocker, Threading::IWaitable *primitive, AuUInt32 timeoutMs)
|
||||
{
|
||||
auto curThread = GetThreadState();
|
||||
|
||||
bool workerIdMatches = (!unlocker.second.has_value()) && (unlocker.second.value() == curThread->id.second);
|
||||
|
||||
if ((unlocker.first == curThread->id.first) && // work group matches
|
||||
((GetThreadWorkersCount(unlocker.first) < 2) || // is there anyone besides us who might deal with this? unlikely fast path
|
||||
(workerIdMatches))) // well, crap
|
||||
{
|
||||
if (workerIdMatches)
|
||||
{
|
||||
LogWarn("Nested Task: {}:{}", unlocker.first, unlocker.second);
|
||||
SysPushErrorLogicError("Nested Task: {}:{}", unlocker.first, unlocker.second);
|
||||
}
|
||||
|
||||
// TODO: timeout isn't respected here as well
|
||||
while (!Threading::WaitFor(primitive, 2))
|
||||
{
|
||||
@ -515,6 +545,12 @@ namespace Aurora::Async
|
||||
return *ret;
|
||||
}
|
||||
|
||||
size_t AsyncApp::GetThreadWorkersCount(ThreadGroup_t group)
|
||||
{
|
||||
Threading::LockGuardPtr lock(rwlock_->AsReadable());
|
||||
return GetGroup(group)->workers.size();
|
||||
}
|
||||
|
||||
AuSPtr<ThreadState> AsyncApp::GetThreadState()
|
||||
{
|
||||
auto id = GetCurrentThread();
|
||||
|
@ -31,14 +31,15 @@ namespace Aurora::Async
|
||||
// Spawning
|
||||
bool Spawn(WorkerId_t) override;
|
||||
Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) override;
|
||||
AuBST<ThreadGroup_t, AuList<ThreadId_t>> GetThreads() override;
|
||||
AuBST<ThreadGroup_t, AuList<ThreadId_t>> GetThreads() override;
|
||||
WorkerId_t GetCurrentThread() override;
|
||||
|
||||
// Synchronization
|
||||
bool Sync(ThreadGroup_t group, bool requireSignal, AuUInt32 timeout) override;
|
||||
void Signal(ThreadGroup_t group) override;
|
||||
|
||||
bool WaitFor(WorkerId_t unlocker, Threading::IWaitable *primitive, int ms) override; // when unlocker = this, pump event loop
|
||||
bool WaitFor(WorkerId_t unlocker, Threading::IWaitable *primitive, AuUInt32 ms) override; // when unlocker = this, pump event loop
|
||||
bool WaitFor(DispatchTarget_t unlocker, Threading::IWaitable *primitive, AuUInt32 ms) override; // when unlocker = this, pump event loop
|
||||
|
||||
bool SyncTimeout(ThreadGroup_t group, AuUInt32 ms) override;
|
||||
|
||||
@ -56,6 +57,9 @@ namespace Aurora::Async
|
||||
void ShutdownOutOfTasks();
|
||||
|
||||
bool Poll(bool block) override;
|
||||
|
||||
size_t GetThreadWorkersCount(ThreadGroup_t group);
|
||||
|
||||
private:
|
||||
|
||||
// TODO: BarrierMultiple
|
||||
|
@ -27,7 +27,7 @@ namespace Aurora::Async
|
||||
//Fail();
|
||||
}
|
||||
|
||||
void WorkItem::WaitFor(const AuSPtr<IWorkItem> &workItem)
|
||||
AuSPtr<IWorkItem> WorkItem::WaitFor(const AuSPtr<IWorkItem> &workItem)
|
||||
{
|
||||
bool status {};
|
||||
|
||||
@ -50,9 +50,11 @@ namespace Aurora::Async
|
||||
{
|
||||
Fail();
|
||||
}
|
||||
|
||||
return AU_SHARED_FROM_THIS;
|
||||
}
|
||||
|
||||
void WorkItem::WaitFor(const AuList<AuSPtr<IWorkItem>> &workItems)
|
||||
AuSPtr<IWorkItem> WorkItem::WaitFor(const AuList<AuSPtr<IWorkItem>> &workItems)
|
||||
{
|
||||
bool status {};
|
||||
|
||||
@ -78,22 +80,27 @@ namespace Aurora::Async
|
||||
{
|
||||
Fail();
|
||||
}
|
||||
|
||||
return AU_SHARED_FROM_THIS;
|
||||
}
|
||||
|
||||
|
||||
void WorkItem::SetSchedTimeNs(AuUInt64 ns)
|
||||
AuSPtr<IWorkItem> WorkItem::SetSchedTimeNs(AuUInt64 ns)
|
||||
{
|
||||
dispatchTimeNs_ = Time::CurrentClockNS() + ns;
|
||||
return AU_SHARED_FROM_THIS;
|
||||
}
|
||||
|
||||
void WorkItem::SetSchedTime(AuUInt32 ms)
|
||||
AuSPtr<IWorkItem> WorkItem::SetSchedTime(AuUInt32 ms)
|
||||
{
|
||||
dispatchTimeNs_ = Time::CurrentClockNS() + (AuUInt64(ms) * AuUInt64(1000000));
|
||||
return AU_SHARED_FROM_THIS;
|
||||
}
|
||||
|
||||
void WorkItem::Dispatch()
|
||||
AuSPtr<IWorkItem> WorkItem::Dispatch()
|
||||
{
|
||||
DispatchEx(false);
|
||||
return AU_SHARED_FROM_THIS;
|
||||
}
|
||||
|
||||
void WorkItem::DispatchEx(bool check)
|
||||
@ -227,8 +234,7 @@ namespace Aurora::Async
|
||||
bool WorkItem::BlockUntilComplete()
|
||||
{
|
||||
if (!finishedEvent_) return false;
|
||||
finishedEvent_->Lock();
|
||||
return true;
|
||||
return Async::GetAsyncApp()->WaitFor(this->worker_, finishedEvent_.get(), 0);
|
||||
}
|
||||
|
||||
bool WorkItem::HasFinished()
|
||||
|
@ -15,12 +15,12 @@ namespace Aurora::Async
|
||||
WorkItem(const DispatchTarget_t &worker_, const AuSPtr<IWorkItemHandler> &task_, bool supportsBlocking);
|
||||
~WorkItem();
|
||||
|
||||
void WaitFor(const AuSPtr<IWorkItem> &workItem) override;
|
||||
void WaitFor(const AuList<AuSPtr<IWorkItem>> &workItem) override;
|
||||
void SetSchedTime(AuUInt32 ms) override;
|
||||
void SetSchedTimeNs(AuUInt64 ns) override;
|
||||
AuSPtr<IWorkItem> WaitFor(const AuSPtr<IWorkItem> &workItem) override;
|
||||
AuSPtr<IWorkItem> WaitFor(const AuList<AuSPtr<IWorkItem>> &workItem) override;
|
||||
AuSPtr<IWorkItem> SetSchedTime(AuUInt32 ms) override;
|
||||
AuSPtr<IWorkItem> SetSchedTimeNs(AuUInt64 ns) override;
|
||||
|
||||
void Dispatch() override;
|
||||
AuSPtr<IWorkItem> Dispatch() override;
|
||||
|
||||
bool BlockUntilComplete() override;
|
||||
bool HasFinished() override;
|
||||
|
Loading…
Reference in New Issue
Block a user