From a7dfd899f89602aebf40fbb6e82099862722ded9 Mon Sep 17 00:00:00 2001 From: Jamie Reece Wilson Date: Sat, 11 Nov 2023 11:27:01 +0000 Subject: [PATCH] [*] e037fc21 cleanup cont --- Source/Async/Async.hpp | 2 +- Source/Async/AuAsyncKeepGroupAlive.cpp | 3 +- .../{GroupState.cpp => AuGroupState.cpp} | 6 +-- .../{GroupState.hpp => AuGroupState.hpp} | 3 +- Source/Async/AuGroupWorkQueue.cpp | 3 +- Source/Async/AuThreadState.hpp | 12 +++++ Source/Async/ThreadPool.cpp | 46 +++++++++++-------- Source/Async/ThreadState.hpp | 34 ++------------ 8 files changed, 49 insertions(+), 60 deletions(-) rename Source/Async/{GroupState.cpp => AuGroupState.cpp} (96%) rename Source/Async/{GroupState.hpp => AuGroupState.hpp} (97%) diff --git a/Source/Async/Async.hpp b/Source/Async/Async.hpp index 8f6a6d0f..73650abc 100644 --- a/Source/Async/Async.hpp +++ b/Source/Async/Async.hpp @@ -7,7 +7,7 @@ ***/ #pragma once -#include "GroupState.hpp" +#include "AuGroupState.hpp" #include "ThreadState.hpp" #include "AsyncRunnable.hpp" diff --git a/Source/Async/AuAsyncKeepGroupAlive.cpp b/Source/Async/AuAsyncKeepGroupAlive.cpp index b38c866f..c8578280 100644 --- a/Source/Async/AuAsyncKeepGroupAlive.cpp +++ b/Source/Async/AuAsyncKeepGroupAlive.cpp @@ -8,8 +8,7 @@ #include #include "AuAsyncKeepGroupAlive.hpp" #include "ThreadPool.hpp" -#include "ThreadState.hpp" -#include "GroupState.hpp" +#include "Async.hpp" namespace Aurora::Async { diff --git a/Source/Async/GroupState.cpp b/Source/Async/AuGroupState.cpp similarity index 96% rename from Source/Async/GroupState.cpp rename to Source/Async/AuGroupState.cpp index 27c5d5a0..f65025ac 100644 --- a/Source/Async/GroupState.cpp +++ b/Source/Async/AuGroupState.cpp @@ -1,13 +1,13 @@ /*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. - File: GroupState.cpp + File: AuGroupState.cpp Date: 2021-11-1 Author: Reece ***/ #include #include "Async.hpp" -#include "GroupState.hpp" +#include "AuGroupState.hpp" #include namespace Aurora::Async @@ -102,8 +102,8 @@ namespace Aurora::Async if (auto pThreadState = AuMakeShared()) { pThreadState->thread.bOwnsThread = !bCreate; + pThreadState->thread.id = workerId; pThreadState->parent = this->SharedFromThis(); - pThreadState->id = workerId; if (!pThreadState->Init()) { diff --git a/Source/Async/GroupState.hpp b/Source/Async/AuGroupState.hpp similarity index 97% rename from Source/Async/GroupState.hpp rename to Source/Async/AuGroupState.hpp index e8afb33a..a5e4cf5a 100644 --- a/Source/Async/GroupState.hpp +++ b/Source/Async/AuGroupState.hpp @@ -1,7 +1,7 @@ /*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. - File: GroupState.hpp + File: AuGroupState.hpp Date: 2021-11-1 Author: Reece ***/ @@ -14,7 +14,6 @@ namespace Aurora::Async { struct ThreadPool; - struct GroupState : AuEnableSharedFromThis { diff --git a/Source/Async/AuGroupWorkQueue.cpp b/Source/Async/AuGroupWorkQueue.cpp index 1236b3c0..3f3f52fe 100644 --- a/Source/Async/AuGroupWorkQueue.cpp +++ b/Source/Async/AuGroupWorkQueue.cpp @@ -7,8 +7,7 @@ ***/ #include #include "AuGroupWorkQueue.hpp" -#include "ThreadState.hpp" -#include "GroupState.hpp" +#include "Async.hpp" #include "ThreadPool.hpp" namespace Aurora::Async diff --git a/Source/Async/AuThreadState.hpp b/Source/Async/AuThreadState.hpp index 474eda22..eb891c0e 100644 --- a/Source/Async/AuThreadState.hpp +++ b/Source/Async/AuThreadState.hpp @@ -10,11 +10,13 @@ namespace Aurora::Async { struct AsyncLoop; + struct GroupState; struct ThreadState; struct ThreadStateShutdown { bool bBreakMainLoop {}; + bool bDropSubmissions {}; bool bIsKillerThread {}; AuUInt32 uShutdownFence { 1 }; }; @@ -38,6 +40,7 @@ namespace Aurora::Async { bool bOwnsThread {}; AuThreads::ThreadShared_t pThread; + WorkerId_t id; }; struct ThreadStateStack @@ -46,16 +49,25 @@ namespace Aurora::Async AuUInt32 uStackMaxRecursiveAsyncPromiseCalls { 4 }; AuUInt32 uStackCallDepth {}; AuUInt32 uStackMaxCookie { 5 }; + AuUInt8 uWorkMultipopCount { 32 }; + }; + struct ThreadStateFeatureCallbacks + { + AuMutex mutex; + AuList> features; }; struct ThreadStateBase { + AuWPtr parent; + ///////////////////////////////// ThreadStateShutdown shutdown; ThreadStateMeta thread; ThreadStateSync sync; AuSPtr asyncLoop; ThreadStateStack stackState; + ThreadStateFeatureCallbacks tlsFeatures; bool Init(); }; diff --git a/Source/Async/ThreadPool.cpp b/Source/Async/ThreadPool.cpp index 152cc7ac..cf38b5e4 100644 --- a/Source/Async/ThreadPool.cpp +++ b/Source/Async/ThreadPool.cpp @@ -70,9 +70,9 @@ namespace Aurora::Async return Threading::WaitFor(primitive.get(), timeoutMs); } - bool workerIdMatches = (unlocker.second == curThread->id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1)); + bool workerIdMatches = (unlocker.second == curThread->thread.id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1)); - if ((unlocker.first == curThread->id.first) && + if ((unlocker.first == curThread->thread.id.first) && (unlocker.pool.get() == this) && // work group matches (workerIdMatches)) // well, crap { @@ -144,6 +144,12 @@ namespace Aurora::Async return; } + if (pWorker->shutdown.bDropSubmissions) + { + runnable->CancelAsync(); + return; + } + if (bIncrement) { AuAtomicAdd(&this->uAtomicCounter, 1u); @@ -296,7 +302,7 @@ namespace Aurora::Async do { - group->workQueue.Dequeue(state->pendingWorkItems, state->multipopCount, state->id.second); + group->workQueue.Dequeue(state->pendingWorkItems, state->stackState.uWorkMultipopCount, state->thread.id.second); state->sync.UpdateCVState(state.get()); @@ -351,7 +357,7 @@ namespace Aurora::Async // ...these primitives are far less expensive to hit than resetting kernel primitives // AU_LOCK_GUARD(state->cvWorkMutex) used to protect us - if (group->workQueue.IsEmpty(this, state->id)) + if (group->workQueue.IsEmpty(this, state->thread.id)) { state->sync.eventLs->Reset(); // ...until we're done AuAtomicStore(&state->sync.cvLSActive, 0u); @@ -487,12 +493,12 @@ namespace Aurora::Async AU_LOCK_GUARD(pGroup->workersMutex); for (auto &[id, worker] : pGroup->workers) { - if (trySelfPid == worker->id) + if (trySelfPid == worker->thread.id) { continue; } - toBarrier.push_back(worker->id); + toBarrier.push_back(worker->thread.id); } } @@ -588,7 +594,7 @@ namespace Aurora::Async auto handle = this->GetThreadHandle(id); if (handle) { - handle->rejecting = false; + handle->shutdown.bDropSubmissions = false; handle->isDeadEvent->LockMS(250); } } @@ -717,7 +723,7 @@ namespace Aurora::Async for (const auto &thread : pGroup->workers) { - workers.push_back(thread.second->id.second); + workers.push_back(thread.second->thread.id.second); } ret[pGroup->group] = workers; @@ -742,7 +748,7 @@ namespace Aurora::Async { for (auto &jobWorker : group->workers) { - if (!Barrier(jobWorker.second->id, timeoutMs, requireSignal && jobWorker.second->id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min? + if (!Barrier(jobWorker.second->thread.id, timeoutMs, requireSignal && jobWorker.second->thread.id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min? { return false; } @@ -800,7 +806,7 @@ namespace Aurora::Async for (auto &jobWorker : pGroup->workers) { - SysAssert(Barrier(jobWorker.second->id, 0, false, false)); + SysAssert(Barrier(jobWorker.second->thread.id, 0, false, false)); } } } @@ -812,8 +818,10 @@ namespace Aurora::Async auto pWorkItem = DispatchOn({ this->SharedFromThis(), id }, [=]() { auto pState = GetThreadState(); - AU_LOCK_GUARD(pState->featuresMutex); - pState->features.push_back(pFeature); + { + AU_LOCK_GUARD(pState->tlsFeatures.mutex); + pState->tlsFeatures.features.push_back(pFeature); + } pFeature->Init(); }); @@ -921,7 +929,7 @@ namespace Aurora::Async if (this->shuttingdown_ & 2) // fast { - if (pA->rejecting) + if (pA->shutdown.bDropSubmissions) { return false; } @@ -1103,7 +1111,7 @@ namespace Aurora::Async if (drop) { - state->rejecting = true; + state->shutdown.bDropSubmissions = true; } if (requireSignal) @@ -1152,7 +1160,7 @@ namespace Aurora::Async { AU_LOCK_GUARD(this->pRWReadView); - if (!this->shuttingdown_ && !job->rejecting) + if (!this->shuttingdown_ && !job->shutdown.bDropSubmissions) { // Pump and barrier + reject all after atomically Barrier(id, 0, false, true); @@ -1202,8 +1210,8 @@ namespace Aurora::Async AuList> features; { - AU_LOCK_GUARD(jobWorker->featuresMutex); - features = AuExchange(jobWorker->features, {}); + AU_LOCK_GUARD(jobWorker->tlsFeatures.mutex); + features = AuExchange(jobWorker->tlsFeatures.features, {}); } { @@ -1258,8 +1266,8 @@ namespace Aurora::Async } { - AU_LOCK_GUARD(pLocalState->featuresMutex); - features = AuExchange(pLocalState->features, {}); + AU_LOCK_GUARD(pLocalState->tlsFeatures.mutex); + features = AuExchange(pLocalState->tlsFeatures.features, {}); } } diff --git a/Source/Async/ThreadState.hpp b/Source/Async/ThreadState.hpp index b349df81..88b838e3 100644 --- a/Source/Async/ThreadState.hpp +++ b/Source/Async/ThreadState.hpp @@ -13,54 +13,26 @@ namespace Aurora::Async { - // TODO: this is a hack because i havent implemented an epoll abstraction yet - struct AsyncAppWaitSourceRequest - { - AuConsumer, bool> callback; - AuSPtr loopSource; - AuUInt32 requestedOffset; - AuUInt64 startTime; - AuUInt64 endTime; - }; - struct GroupState; struct AsyncLoop; struct ThreadState : ThreadStateBase { - ThreadState() : running(true, false, true), + ThreadState() : + running(true, false, true), isDeadEvent(false, false, true) - { - - } + { } // :vomit: - WorkerId_t id; - AuUInt8 multipopCount = 32; - AuWPtr parent; AuThreadPrimitives::Semaphore syncSema; AuThreadPrimitives::Event isDeadEvent; - AuMutex featuresMutex; - AuList> features; - bool rejecting {}; bool exiting {}; bool shuttingdown {}; bool exitingflag2 {}; AuThreadPrimitives::Event running; - //bool running; - AuList loopSources; AuList pendingWorkItems; - int cookie {0}; bool bAlreadyDoingExitTick {}; - - // AuThreadPrimitives::SpinLock externalFencesLock; AuList externalFences; - - // - - - - // }; } \ No newline at end of file