[+] Now with a scheduler!

This commit is contained in:
Reece Wilson 2021-06-30 13:00:32 +01:00
parent e30be64c44
commit c8c3908085
12 changed files with 141 additions and 25 deletions

View File

@ -43,7 +43,7 @@ namespace Aurora::Async
EProcessNext type; EProcessNext type;
AuList<AuSPtr<IWorkItem>> waitFor; AuList<AuSPtr<IWorkItem>> waitFor;
AuUInt32 reschedMs; AuUInt32 reschedMs;
//AuUInt64 reschedNs; AuUInt64 reschedNs;
}; };
virtual void DispatchFrame(ProcessInfo &info) = 0; virtual void DispatchFrame(ProcessInfo &info) = 0;
@ -77,13 +77,13 @@ namespace Aurora::Async
std::optional<Result_t>(* onFrame)(const Info_t &); std::optional<Result_t>(* onFrame)(const Info_t &);
}; };
class IWorkItem class IWorkItem
{ {
public: public:
virtual void WaitFor(const AuSPtr<IWorkItem> &workItem) = 0; virtual void WaitFor(const AuSPtr<IWorkItem> &workItem) = 0;
virtual void WaitFor(const AuList<AuSPtr<IWorkItem>> &workItem) = 0; virtual void WaitFor(const AuList<AuSPtr<IWorkItem>> &workItem) = 0;
virtual void SetSchedTime(AuUInt32 ms) = 0; virtual void SetSchedTime(AuUInt32 ms) = 0;
virtual void SetSchedTimeNs(AuUInt64 ns) = 0;
virtual void Dispatch() = 0; virtual void Dispatch() = 0;
@ -92,7 +92,6 @@ namespace Aurora::Async
virtual bool HasFailed() = 0; virtual bool HasFailed() = 0;
}; };
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const DispatchTarget_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking = false); AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const DispatchTarget_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking = false);
struct BasicWorkStdFunc : IWorkItemHandler struct BasicWorkStdFunc : IWorkItemHandler
@ -324,5 +323,4 @@ namespace Aurora::Async
virtual void AssertInThreadGroup(ThreadGroup_t thread) = 0; virtual void AssertInThreadGroup(ThreadGroup_t thread) = 0;
virtual void AssertWorker(WorkerId_t id) = 0; virtual void AssertWorker(WorkerId_t id) = 0;
}; };
} }

View File

@ -7,11 +7,17 @@
***/ ***/
#include <RuntimeInternal.hpp> #include <RuntimeInternal.hpp>
#include "Async.hpp" #include "Async.hpp"
#include "Schedular.hpp"
namespace Aurora::Async namespace Aurora::Async
{ {
void InitAsync() void InitAsync()
{ {
InitSched();
}
void ShutdownSync()
{
ShutdownSched();
} }
} }

View File

@ -43,4 +43,5 @@ namespace Aurora::Async
}; };
void InitAsync(); void InitAsync();
void ShutdownSync();
} }

View File

@ -15,6 +15,19 @@ namespace Aurora::Async
static AsyncApp gAsyncApp; static AsyncApp gAsyncApp;
static std::atomic_int gRunningTasks {}; static std::atomic_int gRunningTasks {};
void IncRunningTasks()
{
gRunningTasks++;
}
void DecRunningTasks()
{
if ((--gRunningTasks) == 0)
{
gAsyncApp.ShutdownOutOfTasks();
}
}
//STATIC_TLS(WorkerId_t, tlsWorkerId); //STATIC_TLS(WorkerId_t, tlsWorkerId);
static Threading::Threads::TLSVariable<WorkerId_t, true> tlsWorkerId; static Threading::Threads::TLSVariable<WorkerId_t, true> tlsWorkerId;
@ -239,7 +252,7 @@ namespace Aurora::Async
runningTasks = --gRunningTasks; runningTasks = --gRunningTasks;
} }
if (runningTasks) if (runningTasks == 0)
{ {
ShutdownOutOfTasks(); ShutdownOutOfTasks();
} }

View File

@ -13,6 +13,10 @@ namespace Aurora::Async
struct ThreadState; struct ThreadState;
//class WorkItem; //class WorkItem;
void DecRunningTasks();
void IncRunningTasks();
class AsyncApp : public IAsyncApp class AsyncApp : public IAsyncApp
{ {
public: public:
@ -49,9 +53,10 @@ namespace Aurora::Async
void Run(DispatchTarget_t target, AuSPtr<IAsyncRunnable> runnable); void Run(DispatchTarget_t target, AuSPtr<IAsyncRunnable> runnable);
void ShutdownOutOfTasks();
private: private:
void ShutdownOutOfTasks();
// TODO: BarrierMultiple // TODO: BarrierMultiple
bool Barrier(WorkerId_t, AuUInt32 ms, bool requireSignal, bool drop); bool Barrier(WorkerId_t, AuUInt32 ms, bool requireSignal, bool drop);
bool Poll(bool a); bool Poll(bool a);

View File

@ -6,9 +6,86 @@
Author: Reece Author: Reece
***/ ***/
#include <RuntimeInternal.hpp> #include <RuntimeInternal.hpp>
#include "Async.hpp"
#include "Schedular.hpp" #include "Schedular.hpp"
#include "AsyncApp.hpp"
namespace Aurora::Async namespace Aurora::Async
{ {
struct SchedEntry
{
AuUInt64 ns;
DispatchTarget_t target;
AuSPtr<IAsyncRunnable> runnable;
};
static Threading::Threads::ThreadUnique_t gThread;
static Threading::Primitives::RWLockUnique_t gSchedLock;
static AuList<SchedEntry> gEntries;
static void GetDispatchableTasks(AuList<SchedEntry> &pending)
{
Threading::LockGuardPtr lock(gSchedLock->AsReadable());
auto time = Time::CurrentClockNS();
for (auto itr = gEntries.begin(); itr != gEntries.end(); )
{
if (itr->ns <= time)
{
pending.push_back(std::move(*itr));
itr = gEntries.erase(itr);
}
else
{
itr ++;
}
}
}
static void SchedThread()
{
auto thread = Threading::Threads::GetThread();
AuList<SchedEntry> pending;
while (!thread->Exiting())
{
Threading::SleepNs(1000000 / 2);
AuList<SchedEntry> pending;
GetDispatchableTasks(pending);
for (auto &entry : pending)
{
static_cast<AsyncApp *>(GetAsyncApp())->Run(entry.target, entry.runnable);
DecRunningTasks();
}
}
}
void InitSched()
{
gSchedLock = Threading::Primitives::RWLockUnique();
Threading::Threads::AbstractThreadVectors handler;
handler.DoRun = [=](const Threading::Threads::IAuroraThread *thread)
{
SchedThread();
};
gThread = Threading::Threads::ThreadUnique(handler);
gThread->Run();
}
void ShutdownSched()
{
gThread.reset();
gSchedLock.reset();
}
void Schedule(AuUInt64 ns, DispatchTarget_t target, AuSPtr<IAsyncRunnable> runnable)
{
Threading::LockGuardPtr lock(gSchedLock->AsWritable());
IncRunningTasks();
gEntries.push_back({ns, target, runnable});
}
} }

View File

@ -9,5 +9,9 @@
namespace Aurora::Async namespace Aurora::Async
{ {
void InitSched();
void ShutdownSched();
void Schedule(AuUInt64 ns, DispatchTarget_t target, AuSPtr<IAsyncRunnable> runnable);
} }

View File

@ -9,6 +9,7 @@
#include "Async.hpp" #include "Async.hpp"
#include "WorkItem.hpp" #include "WorkItem.hpp"
#include "AsyncApp.hpp" #include "AsyncApp.hpp"
#include "Schedular.hpp"
namespace Aurora::Async namespace Aurora::Async
{ {
@ -79,10 +80,15 @@ namespace Aurora::Async
} }
} }
void WorkItem::SetSchedTimeNs(AuUInt64 ns)
{
dispatchTimeNs_ = Time::CurrentClockNS() + ns;
}
void WorkItem::SetSchedTime(AuUInt32 ms) void WorkItem::SetSchedTime(AuUInt32 ms)
{ {
Threading::LockGuard<Threading::Primitives::SpinLock> l(lock); dispatchTimeNs_ = Time::CurrentClockNS() + (AuUInt64(ms) * AuUInt64(1000000));
dispatchTime_ = Time::CurrentClockMS() + ms;
} }
void WorkItem::Dispatch() void WorkItem::Dispatch()
@ -123,7 +129,7 @@ namespace Aurora::Async
itr = waitOn_.erase(itr); itr = waitOn_.erase(itr);
} }
if (Time::CurrentClockMS() < dispatchTime_) if (Time::CurrentClockNS() < dispatchTimeNs_ )
{ {
Schedule(); Schedule();
return; return;
@ -163,6 +169,10 @@ namespace Aurora::Async
{ {
SetSchedTime(info.reschedMs); SetSchedTime(info.reschedMs);
} }
if (info.reschedNs)
{
SetSchedTimeNs(info.reschedNs);
}
WaitFor(info.waitFor); WaitFor(info.waitFor);
} }
@ -233,7 +243,7 @@ namespace Aurora::Async
void WorkItem::Schedule() void WorkItem::Schedule()
{ {
// TODO: Aurora::Async::Schedule(dispatchTimeNs_, worker_, this->shared_from_this());
} }
void WorkItem::SendOff() void WorkItem::SendOff()

View File

@ -18,6 +18,7 @@ namespace Aurora::Async
void WaitFor(const AuSPtr<IWorkItem> &workItem) override; void WaitFor(const AuSPtr<IWorkItem> &workItem) override;
void WaitFor(const AuList<AuSPtr<IWorkItem>> &workItem) override; void WaitFor(const AuList<AuSPtr<IWorkItem>> &workItem) override;
void SetSchedTime(AuUInt32 ms) override; void SetSchedTime(AuUInt32 ms) override;
void SetSchedTimeNs(AuUInt64 ns) override;
void Dispatch() override; void Dispatch() override;
@ -40,7 +41,7 @@ namespace Aurora::Async
bool finished {}; bool finished {};
bool failed {}; bool failed {};
bool dispatchPending_ {}; bool dispatchPending_ {};
AuUInt32 dispatchTime_ {}; AuUInt64 dispatchTimeNs_ {};
void Fail(); void Fail();
void Schedule(); void Schedule();

View File

@ -21,10 +21,6 @@ namespace Aurora::Console::Hooks
void WriteLine(const ConsoleMessage &msg) void WriteLine(const ConsoleMessage &msg)
{ {
// Note: I believe waiting for the second pump to deposit each line
// would be a bad idea for warnings/debug logs.
// console messages must be written to the handler as soon as
// they are produced to alleviate loss of buffered data on termination.
gMutex->Lock(); gMutex->Lock();
auto callbacks = gLineCallbacks; auto callbacks = gLineCallbacks;
gMutex->Unlock(); gMutex->Unlock();
@ -36,12 +32,7 @@ namespace Aurora::Console::Hooks
callback(msg); callback(msg);
} }
} }
else [[unlikely]] // Note: yes, this is a write line function. else [[unlikely]]
// yes, .find('\n') is rather complex, but i dont care.
// logging FIO has and always will be the bottleneck.
// given the assumption this is slow, we may as well
// unfuck writelines with \n's. fmt's, unwinded stacks,
// etc all benefit from this
{ {
Aurora::Parse::SplitNewlines(msg.line, Aurora::Parse::SplitNewlines(msg.line,
[&](const AuString &line) [&](const AuString &line)

View File

@ -42,6 +42,7 @@ static void Deinit()
{ {
Aurora::RNG::Release(); Aurora::RNG::Release();
Aurora::Console::Exit(); Aurora::Console::Exit();
Aurora::Async::ShutdownSync();
} }
namespace Aurora namespace Aurora

View File

@ -71,7 +71,16 @@ namespace Aurora::IO::FS
} }
{ {
auto systemPath = gApplicationData + AuString(1, kPathSplitter) + fileName; auto systemPath = gHomeDirectory + fileName;
if (FileExists(systemPath))
{
path = systemPath;
return true;
}
}
{
auto systemPath = gApplicationData + fileName;
if (FileExists(systemPath)) if (FileExists(systemPath))
{ {
path = systemPath; path = systemPath;