[*] Fixed bug where schedular was using a read lock in a scope where items were erased from a vector

[*] Refactor a 'Object' member field in a ParseObject struct
[+] Added an option to set a command dispatcher thread from an AsyncApp
[*] Fix various issues with AsyncApp, stablity improvements
[+] Added AddDelayTime
This commit is contained in:
Reece Wilson 2021-07-15 17:16:23 +01:00
parent 7c44d3a02c
commit 72a74eb7a4
16 changed files with 362 additions and 96 deletions

View File

@ -84,6 +84,8 @@ namespace Aurora::Async
virtual AuSPtr<IWorkItem> WaitFor(const AuList<AuSPtr<IWorkItem>> &workItem) = 0; virtual AuSPtr<IWorkItem> WaitFor(const AuList<AuSPtr<IWorkItem>> &workItem) = 0;
virtual AuSPtr<IWorkItem> SetSchedTime(AuUInt32 ms) = 0; virtual AuSPtr<IWorkItem> SetSchedTime(AuUInt32 ms) = 0;
virtual AuSPtr<IWorkItem> SetSchedTimeNs(AuUInt64 ns) = 0; virtual AuSPtr<IWorkItem> SetSchedTimeNs(AuUInt64 ns) = 0;
virtual AuSPtr<IWorkItem> AddDelayTime(AuUInt32 ms) = 0;
virtual AuSPtr<IWorkItem> AddDelayTimeNs(AuUInt64 ns) = 0;
virtual AuSPtr<IWorkItem> Then(const AuSPtr<IWorkItem> &next) = 0; virtual AuSPtr<IWorkItem> Then(const AuSPtr<IWorkItem> &next) = 0;
@ -331,6 +333,7 @@ namespace Aurora::Async
virtual void Main() = 0; virtual void Main() = 0;
virtual void Shutdown() = 0; virtual void Shutdown() = 0;
virtual bool Exiting() = 0; virtual bool Exiting() = 0;
virtual void SetConsoleCommandDispatcher(WorkerId_t id) = 0;
// Spawning // Spawning
virtual bool Spawn(WorkerId_t) = 0; virtual bool Spawn(WorkerId_t) = 0;

View File

@ -35,7 +35,7 @@ namespace Aurora::Parse
struct ParseValueEx : ParseValue struct ParseValueEx : ParseValue
{ {
ParsedObject Object; ParsedObject object;
}; };
struct ParseBit struct ParseBit

View File

@ -124,11 +124,18 @@ namespace Aurora
AuList<AuString> whitelistedCerts{}; AuList<AuString> whitelistedCerts{};
}; };
struct AsyncConfig
{
AuUInt32 schedularFrequency {2}; // * 0.5 or 1 MS depending on the platform
AuUInt32 sysPumpFrequency {10}; // x amount of schedularFrequencys
};
struct RuntimeStartInfo struct RuntimeStartInfo
{ {
ConsoleConfig console; ConsoleConfig console;
CryptoConfig crypto; CryptoConfig crypto;
TelemetryConfig telemetry; TelemetryConfig telemetry;
AsyncConfig async;
}; };
AUKN_SYM void RuntimeStart(const RuntimeStartInfo &info); AUKN_SYM void RuntimeStart(const RuntimeStartInfo &info);

View File

@ -18,6 +18,6 @@ namespace Aurora::Async
void ShutdownAsync() void ShutdownAsync()
{ {
ShutdownSched(); DeinitSched();
} }
} }

View File

@ -9,6 +9,8 @@
#include "Async.hpp" #include "Async.hpp"
#include "AsyncApp.hpp" #include "AsyncApp.hpp"
#include "WorkItem.hpp" #include "WorkItem.hpp"
#include "Schedular.hpp"
#include <Console/Commands/Commands.hpp>
namespace Aurora::Async namespace Aurora::Async
{ {
@ -24,13 +26,15 @@ namespace Aurora::Async
{ {
if ((--gRunningTasks) == 0) if ((--gRunningTasks) == 0)
{ {
gAsyncApp.ShutdownOutOfTasks(); gAsyncApp.Shutdown();
} }
} }
//STATIC_TLS(WorkerId_t, tlsWorkerId); //STATIC_TLS(WorkerId_t, tlsWorkerId);
static Threading::Threads::TLSVariable<WorkerId_t, true> tlsWorkerId; static Threading::Threads::TLSVariable<WorkerId_t, true> tlsWorkerId;
using WorkEntry_t = std::pair<std::optional<ThreadId_t>, AuSPtr<IAsyncRunnable>>;
struct ThreadState struct ThreadState
{ {
WorkerId_t id; WorkerId_t id;
@ -47,6 +51,7 @@ namespace Aurora::Async
bool rejecting {}; bool rejecting {};
bool exiting {}; bool exiting {};
bool shuttingdown {};
Threading::Primitives::EventUnique_t running; Threading::Primitives::EventUnique_t running;
//bool running; //bool running;
@ -56,7 +61,7 @@ namespace Aurora::Async
return id.first == 0; return id.first == 0;
} }
AuList<AuSPtr<IAsyncRunnable>> pendingWorkItems; AuList<WorkEntry_t> pendingWorkItems;
}; };
struct GroupState struct GroupState
@ -66,7 +71,6 @@ namespace Aurora::Async
Threading::Primitives::ConditionMutexUnique_t cvWorkMutex; Threading::Primitives::ConditionMutexUnique_t cvWorkMutex;
Threading::Primitives::ConditionVariableUnique_t cvVariable; Threading::Primitives::ConditionVariableUnique_t cvVariable;
using WorkEntry_t = std::pair<std::optional<ThreadId_t>, AuSPtr<IAsyncRunnable>>;
AuList<WorkEntry_t> workQueue; AuList<WorkEntry_t> workQueue;
AuBST<ThreadId_t, AuSPtr<ThreadState>> workers; AuBST<ThreadId_t, AuSPtr<ThreadState>> workers;
@ -122,6 +126,11 @@ namespace Aurora::Async
} }
unsafeSemaphore->Unlock(1); unsafeSemaphore->Unlock(1);
if (requireSignal)
{
state->running->Lock();
}
})); }));
#if 0 #if 0
@ -138,7 +147,7 @@ namespace Aurora::Async
auto state = GetGroup(target.first); auto state = GetGroup(target.first);
SysAssert(static_cast<bool>(state), "couldn't dispatch a task to an offline group"); SysAssert(static_cast<bool>(state), "couldn't dispatch a task to an offline group");
gRunningTasks++; IncRunningTasks();
{ {
Threading::LockGuardPtr lol(state->cvWorkMutex); Threading::LockGuardPtr lol(state->cvWorkMutex);
@ -148,9 +157,11 @@ namespace Aurora::Async
if (target.second.has_value()) if (target.second.has_value())
{ {
if (state->workers[*target.second]->rejecting) auto itr = state->workers.find(*target.second);
if ((itr == state->workers.end()) || (itr->second->rejecting))
{ {
SysPushErrorGen("worker: {}:{} is offline", target.first, target.second.value_or(0)); SysPushErrorGen("worker: {}:{} is offline", target.first, target.second.value_or(0));
DecRunningTasks();
throw "Requested job worker is offline"; throw "Requested job worker is offline";
} }
} }
@ -170,6 +181,7 @@ namespace Aurora::Async
if (!found) if (!found)
{ {
DecRunningTasks();
throw "No workers available"; throw "No workers available";
} }
} }
@ -201,6 +213,19 @@ namespace Aurora::Async
do do
{ {
// Deque tasks the current thread runner could dipatch
// Noting that `multipopCount` determines how aggressive threads are in dequeuing work
// It's probable `multipopCount` will equal 1 for your use case
//
// Only increment when you know tasks within a group queue will not depend on one another
// *and* tasks require a small amount of execution time
//
// This could be potentially useful for an event dispatcher whereby you're dispatching
// hundreds of items per second, across a thread or two, knowing dequeuing one instead of all
// is a waste of CPU cycles.
//
// Remember, incrementing `multipopCount` is potentially dangerous the second you have local
// thread group waits
for (auto itr = group->workQueue.begin(); for (auto itr = group->workQueue.begin();
((itr != group->workQueue.end()) && ((itr != group->workQueue.end()) &&
(state->pendingWorkItems.size() < state->multipopCount)); (state->pendingWorkItems.size() < state->multipopCount));
@ -208,14 +233,14 @@ namespace Aurora::Async
{ {
if (!itr->first.has_value()) if (!itr->first.has_value())
{ {
state->pendingWorkItems.push_back((*itr).second); state->pendingWorkItems.push_back(*itr);
itr = group->workQueue.erase(itr); itr = group->workQueue.erase(itr);
continue; continue;
} }
if ((itr->first.has_value()) && (itr->first.value() == state->id.second)) if ((itr->first.has_value()) && (itr->first.value() == state->id.second))
{ {
state->pendingWorkItems.push_back((*itr).second); state->pendingWorkItems.push_back(*itr);
itr = group->workQueue.erase(itr); itr = group->workQueue.erase(itr);
continue; continue;
} }
@ -223,16 +248,24 @@ namespace Aurora::Async
itr++; itr++;
} }
// Consider blocking for more work
if (!blocking) if (!blocking)
{ {
break; break;
} }
// Block if no work items are present
if (state->pendingWorkItems.empty()) if (state->pendingWorkItems.empty())
{ {
group->cvVariable->WaitForSignal(); group->cvVariable->WaitForSignal();
} }
// Post-wakeup thread terminating check
if (state->threadObject->Exiting() || state->shuttingdown)
{
break;
}
} while (state->pendingWorkItems.empty()); } while (state->pendingWorkItems.empty());
} }
@ -245,22 +278,39 @@ namespace Aurora::Async
for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); ) for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); )
{ {
// TODO: we should be able to implement a watchdog later down the line if (state->threadObject->Exiting() || state->shuttingdown)
{
break;
}
// Set the last frame time for a watchdog later down the line
state->lastFrameTime = Time::CurrentClockMS(); state->lastFrameTime = Time::CurrentClockMS();
(*itr)->RunAsync();
// Dispatch
itr->second->RunAsync();
// Remove from our local job queue
itr = state->pendingWorkItems.erase(itr); itr = state->pendingWorkItems.erase(itr);
// Atomically decrement global task counter
runningTasks = --gRunningTasks; runningTasks = --gRunningTasks;
} }
// Return popped work back to the groups work pool when our -pump loops were preempted
if (state->pendingWorkItems.size())
{
Threading::LockGuardPtr lol(group->cvWorkMutex);
group->workQueue.insert(group->workQueue.end(), state->pendingWorkItems.begin(), state->pendingWorkItems.end());
}
if (runningTasks == 0) if (runningTasks == 0)
{ {
ShutdownOutOfTasks(); ShutdownZero();
} }
return true; return true;
} }
bool AsyncApp::WaitFor(WorkerId_t worker, Threading::IWaitable *primitive, AuUInt32 timeoutMs) bool AsyncApp::WaitFor(WorkerId_t worker, Threading::IWaitable *primitive, AuUInt32 timeoutMs)
{ {
auto curThread = GetThreadState(); auto curThread = GetThreadState();
@ -316,6 +366,7 @@ namespace Aurora::Async
void AsyncApp::Start() void AsyncApp::Start()
{ {
SysAssert(Spawn({0, 0})); SysAssert(Spawn({0, 0}));
StartSched();
} }
void AsyncApp::Main() void AsyncApp::Main()
@ -323,47 +374,76 @@ namespace Aurora::Async
Entrypoint({0, 0}); Entrypoint({0, 0});
} }
void AsyncApp::ShutdownZero()
{
Shutdown();
}
void AsyncApp::Shutdown() void AsyncApp::Shutdown()
{ {
// Set shutdown trigger // Nested shutdowns can happen a write lock
shuttingdown_ = true; {
Threading::LockGuardPtr lock(rwlock_->AsReadable());
if (shuttingdown_)
{
return;
}
}
// Unpause all // Set shutdown flag
for (auto &re : this->threads_)
{ {
for (auto &[id, worker] : re.second->workers) Threading::LockGuardPtr lock(rwlock_->AsWritable());
if (std::exchange(shuttingdown_, true))
{ {
return;
}
}
// Noting
// 1) that StopSched may lockup under a writable lock
// -> we will terminate a thread that may be dispatching a sys pump event
// 2) that barrier doesn't need to be under a write lock
//
// Perform the following shutdown of the schedular and other available threads under a read lock
{
Threading::LockGuardPtr lock(rwlock_->AsReadable());
StopSched();
for (auto &[groupId, group] : this->threads_)
{
for (auto &[id, worker] : group->workers)
{
Barrier(worker->id, 0, false, true);
}
}
}
// Finally set the shutdown flag on all of our thread contexts
// then release them from the runners/workers list
// then release all group contexts
AuList<Threading::Threads::ThreadShared_t> threads;
{
Threading::LockGuardPtr lock(rwlock_->AsWritable());
for (auto &[groupId, group] : this->threads_)
{
for (auto &[id, worker] : group->workers)
{
worker->shuttingdown = true;
if (groupId != 0)
{
worker->threadObject->SendExitSignal();
threads.push_back(worker->threadObject);
}
auto &event = worker->running; auto &event = worker->running;
if (event) if (event)
{ {
event->Set(); event->Set();
} }
} }
}
// Drop all tasks from this point onwards
for (auto &re : this->threads_)
{
for (auto &[id, worker] : re.second->workers)
{
SysAssert(Barrier(worker->id, 0, true, true));
}
}
// TODO: abort all pending tests
// Signal the event loop to abort
ShutdownOutOfTasks();
}
void AsyncApp::ShutdownOutOfTasks()
{
for (auto& [id, group]: this->threads_)
{
for (auto & [id, worker] : group->workers)
{
worker->threadObject->SendExitSignal();
}
if (group->cvVariable) if (group->cvVariable)
{ {
@ -373,6 +453,13 @@ namespace Aurora::Async
} }
} }
// Sync to shutdown threads to prevent a race condition whereby the async subsystem shuts down before the threads
for (const auto &thread : threads)
{
thread->Exit();
}
}
bool AsyncApp::Exiting() bool AsyncApp::Exiting()
{ {
return shuttingdown_ || GetThreadState()->exiting; return shuttingdown_ || GetThreadState()->exiting;
@ -421,7 +508,7 @@ namespace Aurora::Async
auto threadState = std::make_shared<ThreadState>(); auto threadState = std::make_shared<ThreadState>();
threadState->parent = group; threadState->parent = group;
threadState->running = Threading::Primitives::EventUnique(true, false, false); threadState->running = Threading::Primitives::EventUnique(true, false, true);
threadState->syncSema = Threading::Primitives::SemaphoreUnique(0); threadState->syncSema = Threading::Primitives::SemaphoreUnique(0);
threadState->id = workerId; threadState->id = workerId;
@ -490,7 +577,6 @@ namespace Aurora::Async
bool AsyncApp::Sync(ThreadGroup_t groupId, bool requireSignal, AuUInt32 timeoutMs) bool AsyncApp::Sync(ThreadGroup_t groupId, bool requireSignal, AuUInt32 timeoutMs)
{ {
Threading::LockGuardPtr lock(rwlock_->AsReadable()); Threading::LockGuardPtr lock(rwlock_->AsReadable());
auto group = GetGroup(groupId); auto group = GetGroup(groupId);
for (auto &jobWorker : group->workers) for (auto &jobWorker : group->workers)
@ -555,6 +641,7 @@ namespace Aurora::Async
AuSPtr<ThreadState> AsyncApp::GetThreadState() AuSPtr<ThreadState> AsyncApp::GetThreadState()
{ {
Threading::LockGuardPtr lock(rwlock_->AsReadable());
auto id = GetCurrentThread(); auto id = GetCurrentThread();
auto state = GetGroup(id.first); auto state = GetGroup(id.first);
return state->workers[id.second]; return state->workers[id.second];
@ -567,21 +654,80 @@ namespace Aurora::Async
auto auThread = Threading::Threads::GetThread(); auto auThread = Threading::Threads::GetThread();
auto job = GetThreadState(); auto job = GetThreadState();
while (!auThread->Exiting()) while ((!auThread->Exiting()) && (!job->shuttingdown))
{ {
// Do work (blocking) // Do work (blocking)
Poll(true); Poll(true);
// Synchronization after pause
job->running->Lock();
} }
for (const auto &thread : job->features) if (id != WorkerId_t {0, 0})
{
Threading::LockGuardPtr lock(rwlock_->AsReadable());
if (!shuttingdown_ && !job->rejecting)
{
// Pump and barrier + reject all after atomically
Barrier(id, 0, false, true);
}
}
ThisExiting();
if (id == WorkerId_t {0, 0})
{
Shutdown();
}
}
void AsyncApp::SetConsoleCommandDispatcher(WorkerId_t id)
{
commandDispatcher_ = id;
Console::Commands::UpdateDispatcher(commandDispatcher_);
}
void AsyncApp::ThisExiting()
{
auto id = GetCurrentThread();
auto state = GetGroup(id.first);
{
Threading::LockGuardPtr lock(rwlock_->AsWritable());
auto itr = state->workers.find(id.second);
auto &jobWorker = itr->second;
// This shouldn't be a problem; however, we're going to handle the one edge case where
// some angry sysadmin is spamming commands
if ((commandDispatcher_.has_value())
&& (commandDispatcher_.value() == id))
{
Console::Commands::UpdateDispatcher({});
}
// Abort scheduled tasks
TerminateSceduledTasks(id);
// Clean up thread features
// -> transferable TLS handles
// -> thread specific vms
// -> anything your brain wishes to imagination
for (const auto &thread : jobWorker->features)
{
try
{ {
thread->Cleanup(); thread->Cleanup();
} }
catch (...)
{
LogWarn("Couldn't clean up thread feature!");
Debug::PrintError();
}
}
job->features.clear(); jobWorker->features.clear();
state->workers.erase(itr);
}
} }
void AsyncApp::AddFeature(WorkerId_t id, AuSPtr<Threading::Threads::IThreadFeature> feature, bool async) void AsyncApp::AddFeature(WorkerId_t id, AuSPtr<Threading::Threads::IThreadFeature> feature, bool async)
@ -592,8 +738,7 @@ namespace Aurora::Async
feature->Init(); feature->Init();
})); }));
auto workItem = NewWorkItem(id, work, !async); auto workItem = NewWorkItem(id, work, !async)->Dispatch();
workItem->Dispatch();
if (!async) if (!async)
{ {

View File

@ -27,6 +27,7 @@ namespace Aurora::Async
void Main() override; void Main() override;
void Shutdown() override; void Shutdown() override;
bool Exiting() override; bool Exiting() override;
void SetConsoleCommandDispatcher(WorkerId_t id) override;
// Spawning // Spawning
bool Spawn(WorkerId_t) override; bool Spawn(WorkerId_t) override;
@ -54,13 +55,13 @@ namespace Aurora::Async
void Run(DispatchTarget_t target, AuSPtr<IAsyncRunnable> runnable); void Run(DispatchTarget_t target, AuSPtr<IAsyncRunnable> runnable);
void ShutdownOutOfTasks();
bool Poll(bool block) override; bool Poll(bool block) override;
size_t GetThreadWorkersCount(ThreadGroup_t group); size_t GetThreadWorkersCount(ThreadGroup_t group);
private: private:
void ThisExiting();
void ShutdownZero();
// TODO: BarrierMultiple // TODO: BarrierMultiple
bool Barrier(WorkerId_t, AuUInt32 ms, bool requireSignal, bool drop); bool Barrier(WorkerId_t, AuUInt32 ms, bool requireSignal, bool drop);
@ -77,5 +78,6 @@ namespace Aurora::Async
ThreadDb_t threads_; ThreadDb_t threads_;
bool shuttingdown_ {}; bool shuttingdown_ {};
std::optional<WorkerId_t> commandDispatcher_;
}; };
} }

View File

@ -20,12 +20,12 @@ namespace Aurora::Async
}; };
static Threading::Threads::ThreadUnique_t gThread; static Threading::Threads::ThreadUnique_t gThread;
static Threading::Primitives::RWLockUnique_t gSchedLock; static Threading::Primitives::MutexUnique_t gSchedLock;
static AuList<SchedEntry> gEntries; static AuList<SchedEntry> gEntries;
static void GetDispatchableTasks(AuList<SchedEntry> &pending) static void GetDispatchableTasks(AuList<SchedEntry> &pending)
{ {
Threading::LockGuardPtr lock(gSchedLock->AsReadable()); Threading::LockGuardPtr lock(gSchedLock.get());
auto time = Time::CurrentClockNS(); auto time = Time::CurrentClockNS();
@ -43,6 +43,7 @@ namespace Aurora::Async
} }
} }
static void SchedThread() static void SchedThread()
{ {
AuUInt32 counter {}; AuUInt32 counter {};
@ -52,30 +53,57 @@ namespace Aurora::Async
while (!thread->Exiting()) while (!thread->Exiting())
{ {
Threading::SleepNs(1000000 / 2); {
Threading::SleepNs(1000000 / 2 * gRuntimeConfig.async.schedularFrequency);
}
AuList<SchedEntry> pending; AuList<SchedEntry> pending;
GetDispatchableTasks(pending); GetDispatchableTasks(pending);
for (auto &entry : pending) for (auto &entry : pending)
{
try
{ {
static_cast<AsyncApp *>(GetAsyncApp())->Run(entry.target, entry.runnable); static_cast<AsyncApp *>(GetAsyncApp())->Run(entry.target, entry.runnable);
DecRunningTasks(); DecRunningTasks();
} }
catch (...)
{
LogWarn("Dropped scheduled task! Expect a leaky counter!");
LogWarn("Would you rather `Why u no exit?!` or `WHY DID U JUST CRASH REEEE` in production?");
Debug::PrintError();
}
}
counter++; counter++;
if ((counter % 4) == 0) if ((gRuntimeConfig.async.schedularFrequency != 1) || ((gRuntimeConfig.async.sysPumpFrequency) && (counter % gRuntimeConfig.async.sysPumpFrequency) == 0))
{ {
// TODO: noting this should be called from main; however, we only pump console for.now try
RuntimeSysPump(); {
NewWorkItem({0, 0}, std::make_shared<BasicWorkStdFunc>(RuntimeSysPump))->Dispatch();
}
catch (...)
{
LogWarn("Dropped SysRuntimePump");
Debug::PrintError();
} }
} }
} }
}
void InitSched() void InitSched()
{ {
gSchedLock = Threading::Primitives::RWLockUnique(); gSchedLock = Threading::Primitives::MutexUnique();
}
void DeinitSched()
{
gThread.reset();
gSchedLock.reset();
}
void StartSched()
{
Threading::Threads::AbstractThreadVectors handler; Threading::Threads::AbstractThreadVectors handler;
handler.DoRun = [=](const Threading::Threads::IAuroraThread *thread) handler.DoRun = [=](const Threading::Threads::IAuroraThread *thread)
{ {
@ -85,16 +113,33 @@ namespace Aurora::Async
gThread->Run(); gThread->Run();
} }
void ShutdownSched() void StopSched()
{ {
gThread.reset(); gThread.reset();
gSchedLock.reset();
} }
void Schedule(AuUInt64 ns, DispatchTarget_t target, AuSPtr<IAsyncRunnable> runnable) void Schedule(AuUInt64 ns, DispatchTarget_t target, AuSPtr<IAsyncRunnable> runnable)
{ {
Threading::LockGuardPtr lock(gSchedLock->AsWritable()); Threading::LockGuardPtr lock(gSchedLock.get());
IncRunningTasks(); IncRunningTasks();
gEntries.push_back({ns, target, runnable}); gEntries.push_back({ns, target, runnable});
} }
void TerminateSceduledTasks(DispatchTarget_t target)
{
Threading::LockGuardPtr lock(gSchedLock.get());
for (auto itr = gEntries.begin(); itr != gEntries.end(); )
{
if (itr->target <= target)
{
itr->runnable->CancelAsync();
itr = gEntries.erase(itr);
}
else
{
itr ++;
}
}
}
} }

View File

@ -10,8 +10,10 @@
namespace Aurora::Async namespace Aurora::Async
{ {
void InitSched(); void InitSched();
void ShutdownSched(); void DeinitSched();
void StartSched();
void StopSched();
void Schedule(AuUInt64 ns, DispatchTarget_t target, AuSPtr<IAsyncRunnable> runnable); void Schedule(AuUInt64 ns, DispatchTarget_t target, AuSPtr<IAsyncRunnable> runnable);
void TerminateSceduledTasks(DispatchTarget_t target);
} }

View File

@ -105,6 +105,18 @@ namespace Aurora::Async
return AU_SHARED_FROM_THIS; return AU_SHARED_FROM_THIS;
} }
AuSPtr<IWorkItem> WorkItem::AddDelayTime(AuUInt32 ms)
{
delayTimeNs_ += AuUInt64(ms) * AuUInt64(1000000);
return AU_SHARED_FROM_THIS;
}
AuSPtr<IWorkItem> WorkItem::AddDelayTimeNs(AuUInt64 ns)
{
delayTimeNs_ += ns;
return AU_SHARED_FROM_THIS;
}
AuSPtr<IWorkItem> WorkItem::Dispatch() AuSPtr<IWorkItem> WorkItem::Dispatch()
{ {
DispatchEx(false); DispatchEx(false);
@ -144,12 +156,19 @@ namespace Aurora::Async
itr = waitOn_.erase(itr); itr = waitOn_.erase(itr);
} }
if (Time::CurrentClockNS() < dispatchTimeNs_ ) if (Time::CurrentClockNS() < dispatchTimeNs_)
{ {
Schedule(); Schedule();
return; return;
} }
if (auto delay = std::exchange(delayTimeNs_, {}))
{
dispatchTimeNs_ = delay;
Schedule();
return;
}
SendOff(); SendOff();
} }

View File

@ -19,6 +19,8 @@ namespace Aurora::Async
AuSPtr<IWorkItem> WaitFor(const AuList<AuSPtr<IWorkItem>> &workItem) override; AuSPtr<IWorkItem> WaitFor(const AuList<AuSPtr<IWorkItem>> &workItem) override;
AuSPtr<IWorkItem> SetSchedTime(AuUInt32 ms) override; AuSPtr<IWorkItem> SetSchedTime(AuUInt32 ms) override;
AuSPtr<IWorkItem> SetSchedTimeNs(AuUInt64 ns) override; AuSPtr<IWorkItem> SetSchedTimeNs(AuUInt64 ns) override;
AuSPtr<IWorkItem> AddDelayTime(AuUInt32 ms) override;
AuSPtr<IWorkItem> AddDelayTimeNs(AuUInt64 ns) override;
AuSPtr<IWorkItem> Then(const AuSPtr<IWorkItem> &next) override; AuSPtr<IWorkItem> Then(const AuSPtr<IWorkItem> &next) override;
AuSPtr<IWorkItem> Dispatch() override; AuSPtr<IWorkItem> Dispatch() override;
@ -43,6 +45,7 @@ namespace Aurora::Async
bool failed {}; bool failed {};
bool dispatchPending_ {}; bool dispatchPending_ {};
AuUInt64 dispatchTimeNs_ {}; AuUInt64 dispatchTimeNs_ {};
AuUInt64 delayTimeNs_ {};
void Fail(); void Fail();
void Schedule(); void Schedule();

View File

@ -17,6 +17,8 @@ namespace Aurora::Console::Commands
static AuList<Hooks::LineHook_cb> gLineCallbacks; static AuList<Hooks::LineHook_cb> gLineCallbacks;
static AuList<CommandDispatch> gPendingCommands; static AuList<CommandDispatch> gPendingCommands;
static auto gMutex = Threading::Primitives::MutexUnique(); static auto gMutex = Threading::Primitives::MutexUnique();
static auto gPendingCommandsMutex = Threading::Primitives::MutexUnique();
static std::optional<Async::DispatchTarget_t> gCommandDispatcher;
struct Command struct Command
{ {
@ -37,7 +39,7 @@ namespace Aurora::Console::Commands
static bool Dispatch(const AuString &string) static bool Dispatch(const AuString &string)
{ {
Threading::WaitableLockGuard guard(gMutex.get()); Threading::WaitableLockGuard guard(gPendingCommandsMutex.get());
AuString tag; AuString tag;
AuString cmdParse; AuString cmdParse;
AuMach offset; AuMach offset;
@ -79,7 +81,7 @@ namespace Aurora::Console::Commands
void AddCommand(const AuString &tag, const Parse::ParseObject &commandStructure, const CommandCallback_cb &callback) void AddCommand(const AuString &tag, const Parse::ParseObject &commandStructure, const CommandCallback_cb &callback)
{ {
Threading::WaitableLockGuard guard(gMutex.get()); Threading::WaitableLockGuard guard(gPendingCommandsMutex.get());
gCommands.insert(std::make_pair(tag, Command(tag, commandStructure, callback))); gCommands.insert(std::make_pair(tag, Command(tag, commandStructure, callback)));
} }
@ -88,15 +90,57 @@ namespace Aurora::Console::Commands
return Dispatch(string); return Dispatch(string);
} }
void PumpCommands() void UpdateDispatcher(std::optional<Async::DispatchTarget_t> target)
{ {
gMutex->Lock(); gMutex->Lock();
auto commands = std::exchange(gPendingCommands, {}); gPendingCommandsMutex->Lock();
gMutex->Unlock();
if ((!target.has_value()) && (gCommandDispatcher == target))
{
auto commands = std::exchange(gPendingCommands, {});
for (const auto &command : commands) for (const auto &command : commands)
{ {
command.callback(command.arguments); command.callback(command.arguments);
} }
} }
gCommandDispatcher = target;
gPendingCommandsMutex->Unlock();
gMutex->Unlock();
}
static void DispatchCommandsFromThis(const AuList<CommandDispatch> &commands)
{
for (const auto &command : commands)
{
command.callback(command.arguments);
}
}
void PumpCommands()
{
gMutex->Lock();
gPendingCommandsMutex->Lock();
auto commands = std::exchange(gPendingCommands, {});
gPendingCommandsMutex->Unlock();
if (gCommandDispatcher.value_or(Async::DispatchTarget_t{0, 0}) == Async::DispatchTarget_t{0, 0})
{
DispatchCommandsFromThis(commands);
}
else
{
Async::NewWorkItem(gCommandDispatcher.value(),
std::make_shared<Async::BasicWorkStdFunc>([&commands]()
{
DispatchCommandsFromThis(commands);
}),
true)->Dispatch()->BlockUntilComplete();
}
gMutex->Unlock();
}
} }

View File

@ -9,5 +9,6 @@
namespace Aurora::Console::Commands namespace Aurora::Console::Commands
{ {
void UpdateDispatcher(std::optional<Async::DispatchTarget_t> target);
void PumpCommands(); void PumpCommands();
} }

View File

@ -475,7 +475,6 @@ namespace Aurora::Console::ConsoleWxWidgets
// the shitty wiki doesn't describe how we're supposed to dispatch cmds to the os loop (w/o a window) // the shitty wiki doesn't describe how we're supposed to dispatch cmds to the os loop (w/o a window)
if (!gWxConsoleReady) return; if (!gWxConsoleReady) return;
auto window = wxTheApp->GetTopWindow(); auto window = wxTheApp->GetTopWindow();
if (!window) return; if (!window) return;

View File

@ -441,7 +441,7 @@ namespace Aurora::Parse
// Bah // Bah
ok = Parse(nestedresult, parseBit.objectParse, context); ok = Parse(nestedresult, parseBit.objectParse, context);
// TODO: debug info // TODO: debug info
parsedSingle.Object = nestedresult.result; parsedSingle.object = nestedresult.result;
break; break;
} }
default: default:
@ -597,8 +597,8 @@ namespace Aurora::Parse
} }
case ParsableTag::kParseObject: case ParsableTag::kParseObject:
{ {
Serialize(!isArray ? parsed.value.single.Object : parsed.value.array[i].Object, ret); Serialize(!isArray ? parsed.value.single.object : parsed.value.array[i].object, ret);
parsedSingle.Object = nestedresult.result; parsedSingle.object = nestedresult.result;
break; break;
} }
default: default:

View File

@ -33,7 +33,7 @@ namespace Aurora::Threading
#if defined(AURORA_PLATFORM_LINUX) || defined(AURORA_PLATFORM_ANDROID) #if defined(AURORA_PLATFORM_LINUX) || defined(AURORA_PLATFORM_ANDROID)
usleep(timeout); usleep(timeout);
#else #else
YieldPollNs(true, timeout + Time::CurrentClockNS(), [=]() auto status = YieldPollNs(true, timeout + Time::CurrentInternalClockNS(), [=]()
{ {
return false; return false;
}); });

View File

@ -49,7 +49,8 @@ namespace Aurora::Threading
template<AuMach Flags> // forcefully optiMize by templating a constant argument template<AuMach Flags> // forcefully optiMize by templating a constant argument
static inline void _FastSnooze(long &count, AuUInt64 &startTime, AuUInt64 maxStallNS, int &alpha, int &bravo, bool &forceSpin) //, bool yieldFaster , long maxStallMS = 20) static inline void _FastSnooze(long &count, AuUInt64 &startTime, AuUInt64 maxStallNS, int &alpha, int &bravo, bool &forceSpin) //, bool yieldFaster , long maxStallMS = 20)
{ {
AuUInt64 now = Time::CurrentClockNS(); // TODO: rewrite me
AuUInt64 now = Time::CurrentInternalClockNS();
// Begin least likely checks, we're getting on now // Begin least likely checks, we're getting on now
// Ironically we need to burn off some CPU cycles // Ironically we need to burn off some CPU cycles
@ -96,19 +97,14 @@ namespace Aurora::Threading
#if defined(AURORA_PLATFORM_WIN32) #if defined(AURORA_PLATFORM_WIN32)
SHOULD_CTXSWAP(kPredictedNTOSSwitchTimeYDNS, kPredictedNTOSSwitchTimeRTNS) SHOULD_CTXSWAP(kPredictedNTOSSwitchTimeYDNS, kPredictedNTOSSwitchTimeRTNS)
{ {
// TODO:
::Sleep(1); ::Sleep(1);
return; return;
} }
#endif #endif
// Always at least try to burn some cycles off in a spinlock-esc time waster
if ((count < 15) || (forceSpin))
{
count++;
YieldToSharedCore(count);
return;
}
// Always at least try to burn some cycles off in a spinlock-esc time waster
YieldToOtherThread(); YieldToOtherThread();
} }
@ -145,7 +141,7 @@ namespace Aurora::Threading
long count = 0; long count = 0;
unsigned long long a = Time::CurrentClockNS(); unsigned long long a = Time::CurrentInternalClockNS();
do do
{ {
if (permitMultipleContextSwitches) if (permitMultipleContextSwitches)
@ -161,7 +157,7 @@ namespace Aurora::Threading
{ {
return true; return true;
} }
a = Time::CurrentClockNS(); a = Time::CurrentInternalClockNS();
} while ((!timeoutMs) || (timeoutMs > a)); } while ((!timeoutMs) || (timeoutMs > a));
@ -170,7 +166,7 @@ namespace Aurora::Threading
bool YieldPollNs(bool permitMultipleContextSwitches, AuUInt64 timeoutNs, PollCallback_cb cb) bool YieldPollNs(bool permitMultipleContextSwitches, AuUInt64 timeoutNs, PollCallback_cb cb)
{ {
AuUInt64 time = Time::CurrentClockNS(); AuUInt64 time = Time::CurrentInternalClockNS();
if (cb()) if (cb())
{ {
@ -199,7 +195,7 @@ namespace Aurora::Threading
bool YieldPoll(bool permitMultipleContextSwitches, AuUInt64 timeoutMs, PollCallback_cb cb) bool YieldPoll(bool permitMultipleContextSwitches, AuUInt64 timeoutMs, PollCallback_cb cb)
{ {
AuUInt64 time = Time::CurrentClockNS(); AuUInt64 time = Time::CurrentInternalClockNS();
AuUInt64 timeoutNs = timeoutMs ? (time + (timeoutMs * 1000000)) : 0; AuUInt64 timeoutNs = timeoutMs ? (time + (timeoutMs * 1000000)) : 0;
if (cb()) if (cb())