[*] e037fc21 cleanup cont

This commit is contained in:
Reece Wilson 2023-11-11 11:27:01 +00:00
parent e037fc214a
commit a7dfd899f8
8 changed files with 49 additions and 60 deletions

View File

@ -7,7 +7,7 @@
***/
#pragma once
#include "GroupState.hpp"
#include "AuGroupState.hpp"
#include "ThreadState.hpp"
#include "AsyncRunnable.hpp"

View File

@ -8,8 +8,7 @@
#include <Source/RuntimeInternal.hpp>
#include "AuAsyncKeepGroupAlive.hpp"
#include "ThreadPool.hpp"
#include "ThreadState.hpp"
#include "GroupState.hpp"
#include "Async.hpp"
namespace Aurora::Async
{

View File

@ -1,13 +1,13 @@
/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: GroupState.cpp
File: AuGroupState.cpp
Date: 2021-11-1
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Async.hpp"
#include "GroupState.hpp"
#include "AuGroupState.hpp"
#include <Source/IO/Loop/LSAsync.hpp>
namespace Aurora::Async
@ -102,8 +102,8 @@ namespace Aurora::Async
if (auto pThreadState = AuMakeShared<ThreadState>())
{
pThreadState->thread.bOwnsThread = !bCreate;
pThreadState->thread.id = workerId;
pThreadState->parent = this->SharedFromThis();
pThreadState->id = workerId;
if (!pThreadState->Init())
{

View File

@ -1,7 +1,7 @@
/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: GroupState.hpp
File: AuGroupState.hpp
Date: 2021-11-1
Author: Reece
***/
@ -14,7 +14,6 @@ namespace Aurora::Async
{
struct ThreadPool;
struct GroupState :
AuEnableSharedFromThis<GroupState>
{

View File

@ -7,8 +7,7 @@
***/
#include <Source/RuntimeInternal.hpp>
#include "AuGroupWorkQueue.hpp"
#include "ThreadState.hpp"
#include "GroupState.hpp"
#include "Async.hpp"
#include "ThreadPool.hpp"
namespace Aurora::Async

View File

@ -10,11 +10,13 @@
namespace Aurora::Async
{
struct AsyncLoop;
struct GroupState;
struct ThreadState;
struct ThreadStateShutdown
{
bool bBreakMainLoop {};
bool bDropSubmissions {};
bool bIsKillerThread {};
AuUInt32 uShutdownFence { 1 };
};
@ -38,6 +40,7 @@ namespace Aurora::Async
{
bool bOwnsThread {};
AuThreads::ThreadShared_t pThread;
WorkerId_t id;
};
struct ThreadStateStack
@ -46,16 +49,25 @@ namespace Aurora::Async
AuUInt32 uStackMaxRecursiveAsyncPromiseCalls { 4 };
AuUInt32 uStackCallDepth {};
AuUInt32 uStackMaxCookie { 5 };
AuUInt8 uWorkMultipopCount { 32 };
};
struct ThreadStateFeatureCallbacks
{
AuMutex mutex;
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
};
struct ThreadStateBase
{
AuWPtr<GroupState> parent;
/////////////////////////////////
ThreadStateShutdown shutdown;
ThreadStateMeta thread;
ThreadStateSync sync;
AuSPtr<AsyncLoop> asyncLoop;
ThreadStateStack stackState;
ThreadStateFeatureCallbacks tlsFeatures;
bool Init();
};

View File

@ -70,9 +70,9 @@ namespace Aurora::Async
return Threading::WaitFor(primitive.get(), timeoutMs);
}
bool workerIdMatches = (unlocker.second == curThread->id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1));
bool workerIdMatches = (unlocker.second == curThread->thread.id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1));
if ((unlocker.first == curThread->id.first) &&
if ((unlocker.first == curThread->thread.id.first) &&
(unlocker.pool.get() == this) && // work group matches
(workerIdMatches)) // well, crap
{
@ -144,6 +144,12 @@ namespace Aurora::Async
return;
}
if (pWorker->shutdown.bDropSubmissions)
{
runnable->CancelAsync();
return;
}
if (bIncrement)
{
AuAtomicAdd(&this->uAtomicCounter, 1u);
@ -296,7 +302,7 @@ namespace Aurora::Async
do
{
group->workQueue.Dequeue(state->pendingWorkItems, state->multipopCount, state->id.second);
group->workQueue.Dequeue(state->pendingWorkItems, state->stackState.uWorkMultipopCount, state->thread.id.second);
state->sync.UpdateCVState(state.get());
@ -351,7 +357,7 @@ namespace Aurora::Async
// ...these primitives are far less expensive to hit than resetting kernel primitives
// AU_LOCK_GUARD(state->cvWorkMutex) used to protect us
if (group->workQueue.IsEmpty(this, state->id))
if (group->workQueue.IsEmpty(this, state->thread.id))
{
state->sync.eventLs->Reset(); // ...until we're done
AuAtomicStore(&state->sync.cvLSActive, 0u);
@ -487,12 +493,12 @@ namespace Aurora::Async
AU_LOCK_GUARD(pGroup->workersMutex);
for (auto &[id, worker] : pGroup->workers)
{
if (trySelfPid == worker->id)
if (trySelfPid == worker->thread.id)
{
continue;
}
toBarrier.push_back(worker->id);
toBarrier.push_back(worker->thread.id);
}
}
@ -588,7 +594,7 @@ namespace Aurora::Async
auto handle = this->GetThreadHandle(id);
if (handle)
{
handle->rejecting = false;
handle->shutdown.bDropSubmissions = false;
handle->isDeadEvent->LockMS(250);
}
}
@ -717,7 +723,7 @@ namespace Aurora::Async
for (const auto &thread : pGroup->workers)
{
workers.push_back(thread.second->id.second);
workers.push_back(thread.second->thread.id.second);
}
ret[pGroup->group] = workers;
@ -742,7 +748,7 @@ namespace Aurora::Async
{
for (auto &jobWorker : group->workers)
{
if (!Barrier(jobWorker.second->id, timeoutMs, requireSignal && jobWorker.second->id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
if (!Barrier(jobWorker.second->thread.id, timeoutMs, requireSignal && jobWorker.second->thread.id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
{
return false;
}
@ -800,7 +806,7 @@ namespace Aurora::Async
for (auto &jobWorker : pGroup->workers)
{
SysAssert(Barrier(jobWorker.second->id, 0, false, false));
SysAssert(Barrier(jobWorker.second->thread.id, 0, false, false));
}
}
}
@ -812,8 +818,10 @@ namespace Aurora::Async
auto pWorkItem = DispatchOn({ this->SharedFromThis(), id }, [=]()
{
auto pState = GetThreadState();
AU_LOCK_GUARD(pState->featuresMutex);
pState->features.push_back(pFeature);
{
AU_LOCK_GUARD(pState->tlsFeatures.mutex);
pState->tlsFeatures.features.push_back(pFeature);
}
pFeature->Init();
});
@ -921,7 +929,7 @@ namespace Aurora::Async
if (this->shuttingdown_ & 2) // fast
{
if (pA->rejecting)
if (pA->shutdown.bDropSubmissions)
{
return false;
}
@ -1103,7 +1111,7 @@ namespace Aurora::Async
if (drop)
{
state->rejecting = true;
state->shutdown.bDropSubmissions = true;
}
if (requireSignal)
@ -1152,7 +1160,7 @@ namespace Aurora::Async
{
AU_LOCK_GUARD(this->pRWReadView);
if (!this->shuttingdown_ && !job->rejecting)
if (!this->shuttingdown_ && !job->shutdown.bDropSubmissions)
{
// Pump and barrier + reject all after atomically
Barrier(id, 0, false, true);
@ -1202,8 +1210,8 @@ namespace Aurora::Async
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
{
AU_LOCK_GUARD(jobWorker->featuresMutex);
features = AuExchange(jobWorker->features, {});
AU_LOCK_GUARD(jobWorker->tlsFeatures.mutex);
features = AuExchange(jobWorker->tlsFeatures.features, {});
}
{
@ -1258,8 +1266,8 @@ namespace Aurora::Async
}
{
AU_LOCK_GUARD(pLocalState->featuresMutex);
features = AuExchange(pLocalState->features, {});
AU_LOCK_GUARD(pLocalState->tlsFeatures.mutex);
features = AuExchange(pLocalState->tlsFeatures.features, {});
}
}

View File

@ -13,54 +13,26 @@
namespace Aurora::Async
{
// TODO: this is a hack because i havent implemented an epoll abstraction yet
struct AsyncAppWaitSourceRequest
{
AuConsumer<AuSPtr<AuLoop::ILoopSource>, bool> callback;
AuSPtr<AuLoop::ILoopSource> loopSource;
AuUInt32 requestedOffset;
AuUInt64 startTime;
AuUInt64 endTime;
};
struct GroupState;
struct AsyncLoop;
struct ThreadState : ThreadStateBase
{
ThreadState() : running(true, false, true),
ThreadState() :
running(true, false, true),
isDeadEvent(false, false, true)
{
}
{ }
// :vomit:
WorkerId_t id;
AuUInt8 multipopCount = 32;
AuWPtr<GroupState> parent;
AuThreadPrimitives::Semaphore syncSema;
AuThreadPrimitives::Event isDeadEvent;
AuMutex featuresMutex;
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
bool rejecting {};
bool exiting {};
bool shuttingdown {};
bool exitingflag2 {};
AuThreadPrimitives::Event running;
//bool running;
AuList<AsyncAppWaitSourceRequest> loopSources;
AuList<WorkEntry_t> pendingWorkItems;
int cookie {0};
bool bAlreadyDoingExitTick {};
//
AuThreadPrimitives::SpinLock externalFencesLock;
AuList<AuThreading::IWaitable *> externalFences;
//
//
};
}