[*] e037fc21 cleanup cont

This commit is contained in:
Reece Wilson 2023-11-11 11:27:01 +00:00
parent e037fc214a
commit a7dfd899f8
8 changed files with 49 additions and 60 deletions

View File

@ -7,7 +7,7 @@
***/ ***/
#pragma once #pragma once
#include "GroupState.hpp" #include "AuGroupState.hpp"
#include "ThreadState.hpp" #include "ThreadState.hpp"
#include "AsyncRunnable.hpp" #include "AsyncRunnable.hpp"

View File

@ -8,8 +8,7 @@
#include <Source/RuntimeInternal.hpp> #include <Source/RuntimeInternal.hpp>
#include "AuAsyncKeepGroupAlive.hpp" #include "AuAsyncKeepGroupAlive.hpp"
#include "ThreadPool.hpp" #include "ThreadPool.hpp"
#include "ThreadState.hpp" #include "Async.hpp"
#include "GroupState.hpp"
namespace Aurora::Async namespace Aurora::Async
{ {

View File

@ -1,13 +1,13 @@
/*** /***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: GroupState.cpp File: AuGroupState.cpp
Date: 2021-11-1 Date: 2021-11-1
Author: Reece Author: Reece
***/ ***/
#include <Source/RuntimeInternal.hpp> #include <Source/RuntimeInternal.hpp>
#include "Async.hpp" #include "Async.hpp"
#include "GroupState.hpp" #include "AuGroupState.hpp"
#include <Source/IO/Loop/LSAsync.hpp> #include <Source/IO/Loop/LSAsync.hpp>
namespace Aurora::Async namespace Aurora::Async
@ -102,8 +102,8 @@ namespace Aurora::Async
if (auto pThreadState = AuMakeShared<ThreadState>()) if (auto pThreadState = AuMakeShared<ThreadState>())
{ {
pThreadState->thread.bOwnsThread = !bCreate; pThreadState->thread.bOwnsThread = !bCreate;
pThreadState->thread.id = workerId;
pThreadState->parent = this->SharedFromThis(); pThreadState->parent = this->SharedFromThis();
pThreadState->id = workerId;
if (!pThreadState->Init()) if (!pThreadState->Init())
{ {

View File

@ -1,7 +1,7 @@
/*** /***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: GroupState.hpp File: AuGroupState.hpp
Date: 2021-11-1 Date: 2021-11-1
Author: Reece Author: Reece
***/ ***/
@ -14,7 +14,6 @@ namespace Aurora::Async
{ {
struct ThreadPool; struct ThreadPool;
struct GroupState : struct GroupState :
AuEnableSharedFromThis<GroupState> AuEnableSharedFromThis<GroupState>
{ {

View File

@ -7,8 +7,7 @@
***/ ***/
#include <Source/RuntimeInternal.hpp> #include <Source/RuntimeInternal.hpp>
#include "AuGroupWorkQueue.hpp" #include "AuGroupWorkQueue.hpp"
#include "ThreadState.hpp" #include "Async.hpp"
#include "GroupState.hpp"
#include "ThreadPool.hpp" #include "ThreadPool.hpp"
namespace Aurora::Async namespace Aurora::Async

View File

@ -10,11 +10,13 @@
namespace Aurora::Async namespace Aurora::Async
{ {
struct AsyncLoop; struct AsyncLoop;
struct GroupState;
struct ThreadState; struct ThreadState;
struct ThreadStateShutdown struct ThreadStateShutdown
{ {
bool bBreakMainLoop {}; bool bBreakMainLoop {};
bool bDropSubmissions {};
bool bIsKillerThread {}; bool bIsKillerThread {};
AuUInt32 uShutdownFence { 1 }; AuUInt32 uShutdownFence { 1 };
}; };
@ -38,6 +40,7 @@ namespace Aurora::Async
{ {
bool bOwnsThread {}; bool bOwnsThread {};
AuThreads::ThreadShared_t pThread; AuThreads::ThreadShared_t pThread;
WorkerId_t id;
}; };
struct ThreadStateStack struct ThreadStateStack
@ -46,16 +49,25 @@ namespace Aurora::Async
AuUInt32 uStackMaxRecursiveAsyncPromiseCalls { 4 }; AuUInt32 uStackMaxRecursiveAsyncPromiseCalls { 4 };
AuUInt32 uStackCallDepth {}; AuUInt32 uStackCallDepth {};
AuUInt32 uStackMaxCookie { 5 }; AuUInt32 uStackMaxCookie { 5 };
AuUInt8 uWorkMultipopCount { 32 };
};
struct ThreadStateFeatureCallbacks
{
AuMutex mutex;
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
}; };
struct ThreadStateBase struct ThreadStateBase
{ {
AuWPtr<GroupState> parent;
/////////////////////////////////
ThreadStateShutdown shutdown; ThreadStateShutdown shutdown;
ThreadStateMeta thread; ThreadStateMeta thread;
ThreadStateSync sync; ThreadStateSync sync;
AuSPtr<AsyncLoop> asyncLoop; AuSPtr<AsyncLoop> asyncLoop;
ThreadStateStack stackState; ThreadStateStack stackState;
ThreadStateFeatureCallbacks tlsFeatures;
bool Init(); bool Init();
}; };

View File

@ -70,9 +70,9 @@ namespace Aurora::Async
return Threading::WaitFor(primitive.get(), timeoutMs); return Threading::WaitFor(primitive.get(), timeoutMs);
} }
bool workerIdMatches = (unlocker.second == curThread->id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1)); bool workerIdMatches = (unlocker.second == curThread->thread.id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1));
if ((unlocker.first == curThread->id.first) && if ((unlocker.first == curThread->thread.id.first) &&
(unlocker.pool.get() == this) && // work group matches (unlocker.pool.get() == this) && // work group matches
(workerIdMatches)) // well, crap (workerIdMatches)) // well, crap
{ {
@ -144,6 +144,12 @@ namespace Aurora::Async
return; return;
} }
if (pWorker->shutdown.bDropSubmissions)
{
runnable->CancelAsync();
return;
}
if (bIncrement) if (bIncrement)
{ {
AuAtomicAdd(&this->uAtomicCounter, 1u); AuAtomicAdd(&this->uAtomicCounter, 1u);
@ -296,7 +302,7 @@ namespace Aurora::Async
do do
{ {
group->workQueue.Dequeue(state->pendingWorkItems, state->multipopCount, state->id.second); group->workQueue.Dequeue(state->pendingWorkItems, state->stackState.uWorkMultipopCount, state->thread.id.second);
state->sync.UpdateCVState(state.get()); state->sync.UpdateCVState(state.get());
@ -351,7 +357,7 @@ namespace Aurora::Async
// ...these primitives are far less expensive to hit than resetting kernel primitives // ...these primitives are far less expensive to hit than resetting kernel primitives
// AU_LOCK_GUARD(state->cvWorkMutex) used to protect us // AU_LOCK_GUARD(state->cvWorkMutex) used to protect us
if (group->workQueue.IsEmpty(this, state->id)) if (group->workQueue.IsEmpty(this, state->thread.id))
{ {
state->sync.eventLs->Reset(); // ...until we're done state->sync.eventLs->Reset(); // ...until we're done
AuAtomicStore(&state->sync.cvLSActive, 0u); AuAtomicStore(&state->sync.cvLSActive, 0u);
@ -487,12 +493,12 @@ namespace Aurora::Async
AU_LOCK_GUARD(pGroup->workersMutex); AU_LOCK_GUARD(pGroup->workersMutex);
for (auto &[id, worker] : pGroup->workers) for (auto &[id, worker] : pGroup->workers)
{ {
if (trySelfPid == worker->id) if (trySelfPid == worker->thread.id)
{ {
continue; continue;
} }
toBarrier.push_back(worker->id); toBarrier.push_back(worker->thread.id);
} }
} }
@ -588,7 +594,7 @@ namespace Aurora::Async
auto handle = this->GetThreadHandle(id); auto handle = this->GetThreadHandle(id);
if (handle) if (handle)
{ {
handle->rejecting = false; handle->shutdown.bDropSubmissions = false;
handle->isDeadEvent->LockMS(250); handle->isDeadEvent->LockMS(250);
} }
} }
@ -717,7 +723,7 @@ namespace Aurora::Async
for (const auto &thread : pGroup->workers) for (const auto &thread : pGroup->workers)
{ {
workers.push_back(thread.second->id.second); workers.push_back(thread.second->thread.id.second);
} }
ret[pGroup->group] = workers; ret[pGroup->group] = workers;
@ -742,7 +748,7 @@ namespace Aurora::Async
{ {
for (auto &jobWorker : group->workers) for (auto &jobWorker : group->workers)
{ {
if (!Barrier(jobWorker.second->id, timeoutMs, requireSignal && jobWorker.second->id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min? if (!Barrier(jobWorker.second->thread.id, timeoutMs, requireSignal && jobWorker.second->thread.id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
{ {
return false; return false;
} }
@ -800,7 +806,7 @@ namespace Aurora::Async
for (auto &jobWorker : pGroup->workers) for (auto &jobWorker : pGroup->workers)
{ {
SysAssert(Barrier(jobWorker.second->id, 0, false, false)); SysAssert(Barrier(jobWorker.second->thread.id, 0, false, false));
} }
} }
} }
@ -812,8 +818,10 @@ namespace Aurora::Async
auto pWorkItem = DispatchOn({ this->SharedFromThis(), id }, [=]() auto pWorkItem = DispatchOn({ this->SharedFromThis(), id }, [=]()
{ {
auto pState = GetThreadState(); auto pState = GetThreadState();
AU_LOCK_GUARD(pState->featuresMutex); {
pState->features.push_back(pFeature); AU_LOCK_GUARD(pState->tlsFeatures.mutex);
pState->tlsFeatures.features.push_back(pFeature);
}
pFeature->Init(); pFeature->Init();
}); });
@ -921,7 +929,7 @@ namespace Aurora::Async
if (this->shuttingdown_ & 2) // fast if (this->shuttingdown_ & 2) // fast
{ {
if (pA->rejecting) if (pA->shutdown.bDropSubmissions)
{ {
return false; return false;
} }
@ -1103,7 +1111,7 @@ namespace Aurora::Async
if (drop) if (drop)
{ {
state->rejecting = true; state->shutdown.bDropSubmissions = true;
} }
if (requireSignal) if (requireSignal)
@ -1152,7 +1160,7 @@ namespace Aurora::Async
{ {
AU_LOCK_GUARD(this->pRWReadView); AU_LOCK_GUARD(this->pRWReadView);
if (!this->shuttingdown_ && !job->rejecting) if (!this->shuttingdown_ && !job->shutdown.bDropSubmissions)
{ {
// Pump and barrier + reject all after atomically // Pump and barrier + reject all after atomically
Barrier(id, 0, false, true); Barrier(id, 0, false, true);
@ -1202,8 +1210,8 @@ namespace Aurora::Async
AuList<AuSPtr<AuThreads::IThreadFeature>> features; AuList<AuSPtr<AuThreads::IThreadFeature>> features;
{ {
AU_LOCK_GUARD(jobWorker->featuresMutex); AU_LOCK_GUARD(jobWorker->tlsFeatures.mutex);
features = AuExchange(jobWorker->features, {}); features = AuExchange(jobWorker->tlsFeatures.features, {});
} }
{ {
@ -1258,8 +1266,8 @@ namespace Aurora::Async
} }
{ {
AU_LOCK_GUARD(pLocalState->featuresMutex); AU_LOCK_GUARD(pLocalState->tlsFeatures.mutex);
features = AuExchange(pLocalState->features, {}); features = AuExchange(pLocalState->tlsFeatures.features, {});
} }
} }

View File

@ -13,54 +13,26 @@
namespace Aurora::Async namespace Aurora::Async
{ {
// TODO: this is a hack because i havent implemented an epoll abstraction yet
struct AsyncAppWaitSourceRequest
{
AuConsumer<AuSPtr<AuLoop::ILoopSource>, bool> callback;
AuSPtr<AuLoop::ILoopSource> loopSource;
AuUInt32 requestedOffset;
AuUInt64 startTime;
AuUInt64 endTime;
};
struct GroupState; struct GroupState;
struct AsyncLoop; struct AsyncLoop;
struct ThreadState : ThreadStateBase struct ThreadState : ThreadStateBase
{ {
ThreadState() : running(true, false, true), ThreadState() :
running(true, false, true),
isDeadEvent(false, false, true) isDeadEvent(false, false, true)
{ { }
}
// :vomit: // :vomit:
WorkerId_t id;
AuUInt8 multipopCount = 32;
AuWPtr<GroupState> parent;
AuThreadPrimitives::Semaphore syncSema; AuThreadPrimitives::Semaphore syncSema;
AuThreadPrimitives::Event isDeadEvent; AuThreadPrimitives::Event isDeadEvent;
AuMutex featuresMutex;
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
bool rejecting {};
bool exiting {}; bool exiting {};
bool shuttingdown {}; bool shuttingdown {};
bool exitingflag2 {}; bool exitingflag2 {};
AuThreadPrimitives::Event running; AuThreadPrimitives::Event running;
//bool running;
AuList<AsyncAppWaitSourceRequest> loopSources;
AuList<WorkEntry_t> pendingWorkItems; AuList<WorkEntry_t> pendingWorkItems;
int cookie {0};
bool bAlreadyDoingExitTick {}; bool bAlreadyDoingExitTick {};
//
AuThreadPrimitives::SpinLock externalFencesLock; AuThreadPrimitives::SpinLock externalFencesLock;
AuList<AuThreading::IWaitable *> externalFences; AuList<AuThreading::IWaitable *> externalFences;
//
//
}; };
} }