[*] e037fc21
cleanup cont
This commit is contained in:
parent
e037fc214a
commit
a7dfd899f8
@ -7,7 +7,7 @@
|
||||
***/
|
||||
#pragma once
|
||||
|
||||
#include "GroupState.hpp"
|
||||
#include "AuGroupState.hpp"
|
||||
#include "ThreadState.hpp"
|
||||
#include "AsyncRunnable.hpp"
|
||||
|
||||
|
@ -8,8 +8,7 @@
|
||||
#include <Source/RuntimeInternal.hpp>
|
||||
#include "AuAsyncKeepGroupAlive.hpp"
|
||||
#include "ThreadPool.hpp"
|
||||
#include "ThreadState.hpp"
|
||||
#include "GroupState.hpp"
|
||||
#include "Async.hpp"
|
||||
|
||||
namespace Aurora::Async
|
||||
{
|
||||
|
@ -1,13 +1,13 @@
|
||||
/***
|
||||
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||
|
||||
File: GroupState.cpp
|
||||
File: AuGroupState.cpp
|
||||
Date: 2021-11-1
|
||||
Author: Reece
|
||||
***/
|
||||
#include <Source/RuntimeInternal.hpp>
|
||||
#include "Async.hpp"
|
||||
#include "GroupState.hpp"
|
||||
#include "AuGroupState.hpp"
|
||||
#include <Source/IO/Loop/LSAsync.hpp>
|
||||
|
||||
namespace Aurora::Async
|
||||
@ -102,8 +102,8 @@ namespace Aurora::Async
|
||||
if (auto pThreadState = AuMakeShared<ThreadState>())
|
||||
{
|
||||
pThreadState->thread.bOwnsThread = !bCreate;
|
||||
pThreadState->thread.id = workerId;
|
||||
pThreadState->parent = this->SharedFromThis();
|
||||
pThreadState->id = workerId;
|
||||
|
||||
if (!pThreadState->Init())
|
||||
{
|
@ -1,7 +1,7 @@
|
||||
/***
|
||||
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||
|
||||
File: GroupState.hpp
|
||||
File: AuGroupState.hpp
|
||||
Date: 2021-11-1
|
||||
Author: Reece
|
||||
***/
|
||||
@ -14,7 +14,6 @@ namespace Aurora::Async
|
||||
{
|
||||
struct ThreadPool;
|
||||
|
||||
|
||||
struct GroupState :
|
||||
AuEnableSharedFromThis<GroupState>
|
||||
{
|
@ -7,8 +7,7 @@
|
||||
***/
|
||||
#include <Source/RuntimeInternal.hpp>
|
||||
#include "AuGroupWorkQueue.hpp"
|
||||
#include "ThreadState.hpp"
|
||||
#include "GroupState.hpp"
|
||||
#include "Async.hpp"
|
||||
#include "ThreadPool.hpp"
|
||||
|
||||
namespace Aurora::Async
|
||||
|
@ -10,11 +10,13 @@
|
||||
namespace Aurora::Async
|
||||
{
|
||||
struct AsyncLoop;
|
||||
struct GroupState;
|
||||
struct ThreadState;
|
||||
|
||||
struct ThreadStateShutdown
|
||||
{
|
||||
bool bBreakMainLoop {};
|
||||
bool bDropSubmissions {};
|
||||
bool bIsKillerThread {};
|
||||
AuUInt32 uShutdownFence { 1 };
|
||||
};
|
||||
@ -38,6 +40,7 @@ namespace Aurora::Async
|
||||
{
|
||||
bool bOwnsThread {};
|
||||
AuThreads::ThreadShared_t pThread;
|
||||
WorkerId_t id;
|
||||
};
|
||||
|
||||
struct ThreadStateStack
|
||||
@ -46,16 +49,25 @@ namespace Aurora::Async
|
||||
AuUInt32 uStackMaxRecursiveAsyncPromiseCalls { 4 };
|
||||
AuUInt32 uStackCallDepth {};
|
||||
AuUInt32 uStackMaxCookie { 5 };
|
||||
AuUInt8 uWorkMultipopCount { 32 };
|
||||
};
|
||||
|
||||
struct ThreadStateFeatureCallbacks
|
||||
{
|
||||
AuMutex mutex;
|
||||
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
|
||||
};
|
||||
|
||||
struct ThreadStateBase
|
||||
{
|
||||
AuWPtr<GroupState> parent;
|
||||
/////////////////////////////////
|
||||
ThreadStateShutdown shutdown;
|
||||
ThreadStateMeta thread;
|
||||
ThreadStateSync sync;
|
||||
AuSPtr<AsyncLoop> asyncLoop;
|
||||
ThreadStateStack stackState;
|
||||
ThreadStateFeatureCallbacks tlsFeatures;
|
||||
|
||||
bool Init();
|
||||
};
|
||||
|
@ -70,9 +70,9 @@ namespace Aurora::Async
|
||||
return Threading::WaitFor(primitive.get(), timeoutMs);
|
||||
}
|
||||
|
||||
bool workerIdMatches = (unlocker.second == curThread->id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1));
|
||||
bool workerIdMatches = (unlocker.second == curThread->thread.id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1));
|
||||
|
||||
if ((unlocker.first == curThread->id.first) &&
|
||||
if ((unlocker.first == curThread->thread.id.first) &&
|
||||
(unlocker.pool.get() == this) && // work group matches
|
||||
(workerIdMatches)) // well, crap
|
||||
{
|
||||
@ -144,6 +144,12 @@ namespace Aurora::Async
|
||||
return;
|
||||
}
|
||||
|
||||
if (pWorker->shutdown.bDropSubmissions)
|
||||
{
|
||||
runnable->CancelAsync();
|
||||
return;
|
||||
}
|
||||
|
||||
if (bIncrement)
|
||||
{
|
||||
AuAtomicAdd(&this->uAtomicCounter, 1u);
|
||||
@ -296,7 +302,7 @@ namespace Aurora::Async
|
||||
|
||||
do
|
||||
{
|
||||
group->workQueue.Dequeue(state->pendingWorkItems, state->multipopCount, state->id.second);
|
||||
group->workQueue.Dequeue(state->pendingWorkItems, state->stackState.uWorkMultipopCount, state->thread.id.second);
|
||||
|
||||
state->sync.UpdateCVState(state.get());
|
||||
|
||||
@ -351,7 +357,7 @@ namespace Aurora::Async
|
||||
// ...these primitives are far less expensive to hit than resetting kernel primitives
|
||||
// AU_LOCK_GUARD(state->cvWorkMutex) used to protect us
|
||||
|
||||
if (group->workQueue.IsEmpty(this, state->id))
|
||||
if (group->workQueue.IsEmpty(this, state->thread.id))
|
||||
{
|
||||
state->sync.eventLs->Reset(); // ...until we're done
|
||||
AuAtomicStore(&state->sync.cvLSActive, 0u);
|
||||
@ -487,12 +493,12 @@ namespace Aurora::Async
|
||||
AU_LOCK_GUARD(pGroup->workersMutex);
|
||||
for (auto &[id, worker] : pGroup->workers)
|
||||
{
|
||||
if (trySelfPid == worker->id)
|
||||
if (trySelfPid == worker->thread.id)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
toBarrier.push_back(worker->id);
|
||||
toBarrier.push_back(worker->thread.id);
|
||||
}
|
||||
}
|
||||
|
||||
@ -588,7 +594,7 @@ namespace Aurora::Async
|
||||
auto handle = this->GetThreadHandle(id);
|
||||
if (handle)
|
||||
{
|
||||
handle->rejecting = false;
|
||||
handle->shutdown.bDropSubmissions = false;
|
||||
handle->isDeadEvent->LockMS(250);
|
||||
}
|
||||
}
|
||||
@ -717,7 +723,7 @@ namespace Aurora::Async
|
||||
|
||||
for (const auto &thread : pGroup->workers)
|
||||
{
|
||||
workers.push_back(thread.second->id.second);
|
||||
workers.push_back(thread.second->thread.id.second);
|
||||
}
|
||||
|
||||
ret[pGroup->group] = workers;
|
||||
@ -742,7 +748,7 @@ namespace Aurora::Async
|
||||
{
|
||||
for (auto &jobWorker : group->workers)
|
||||
{
|
||||
if (!Barrier(jobWorker.second->id, timeoutMs, requireSignal && jobWorker.second->id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
|
||||
if (!Barrier(jobWorker.second->thread.id, timeoutMs, requireSignal && jobWorker.second->thread.id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
|
||||
{
|
||||
return false;
|
||||
}
|
||||
@ -800,7 +806,7 @@ namespace Aurora::Async
|
||||
|
||||
for (auto &jobWorker : pGroup->workers)
|
||||
{
|
||||
SysAssert(Barrier(jobWorker.second->id, 0, false, false));
|
||||
SysAssert(Barrier(jobWorker.second->thread.id, 0, false, false));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -812,8 +818,10 @@ namespace Aurora::Async
|
||||
auto pWorkItem = DispatchOn({ this->SharedFromThis(), id }, [=]()
|
||||
{
|
||||
auto pState = GetThreadState();
|
||||
AU_LOCK_GUARD(pState->featuresMutex);
|
||||
pState->features.push_back(pFeature);
|
||||
{
|
||||
AU_LOCK_GUARD(pState->tlsFeatures.mutex);
|
||||
pState->tlsFeatures.features.push_back(pFeature);
|
||||
}
|
||||
pFeature->Init();
|
||||
});
|
||||
|
||||
@ -921,7 +929,7 @@ namespace Aurora::Async
|
||||
|
||||
if (this->shuttingdown_ & 2) // fast
|
||||
{
|
||||
if (pA->rejecting)
|
||||
if (pA->shutdown.bDropSubmissions)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
@ -1103,7 +1111,7 @@ namespace Aurora::Async
|
||||
|
||||
if (drop)
|
||||
{
|
||||
state->rejecting = true;
|
||||
state->shutdown.bDropSubmissions = true;
|
||||
}
|
||||
|
||||
if (requireSignal)
|
||||
@ -1152,7 +1160,7 @@ namespace Aurora::Async
|
||||
{
|
||||
AU_LOCK_GUARD(this->pRWReadView);
|
||||
|
||||
if (!this->shuttingdown_ && !job->rejecting)
|
||||
if (!this->shuttingdown_ && !job->shutdown.bDropSubmissions)
|
||||
{
|
||||
// Pump and barrier + reject all after atomically
|
||||
Barrier(id, 0, false, true);
|
||||
@ -1202,8 +1210,8 @@ namespace Aurora::Async
|
||||
|
||||
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
|
||||
{
|
||||
AU_LOCK_GUARD(jobWorker->featuresMutex);
|
||||
features = AuExchange(jobWorker->features, {});
|
||||
AU_LOCK_GUARD(jobWorker->tlsFeatures.mutex);
|
||||
features = AuExchange(jobWorker->tlsFeatures.features, {});
|
||||
}
|
||||
|
||||
{
|
||||
@ -1258,8 +1266,8 @@ namespace Aurora::Async
|
||||
}
|
||||
|
||||
{
|
||||
AU_LOCK_GUARD(pLocalState->featuresMutex);
|
||||
features = AuExchange(pLocalState->features, {});
|
||||
AU_LOCK_GUARD(pLocalState->tlsFeatures.mutex);
|
||||
features = AuExchange(pLocalState->tlsFeatures.features, {});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -13,54 +13,26 @@
|
||||
|
||||
namespace Aurora::Async
|
||||
{
|
||||
// TODO: this is a hack because i havent implemented an epoll abstraction yet
|
||||
struct AsyncAppWaitSourceRequest
|
||||
{
|
||||
AuConsumer<AuSPtr<AuLoop::ILoopSource>, bool> callback;
|
||||
AuSPtr<AuLoop::ILoopSource> loopSource;
|
||||
AuUInt32 requestedOffset;
|
||||
AuUInt64 startTime;
|
||||
AuUInt64 endTime;
|
||||
};
|
||||
|
||||
struct GroupState;
|
||||
struct AsyncLoop;
|
||||
|
||||
struct ThreadState : ThreadStateBase
|
||||
{
|
||||
ThreadState() : running(true, false, true),
|
||||
ThreadState() :
|
||||
running(true, false, true),
|
||||
isDeadEvent(false, false, true)
|
||||
{
|
||||
|
||||
}
|
||||
{ }
|
||||
|
||||
// :vomit:
|
||||
WorkerId_t id;
|
||||
AuUInt8 multipopCount = 32;
|
||||
AuWPtr<GroupState> parent;
|
||||
AuThreadPrimitives::Semaphore syncSema;
|
||||
AuThreadPrimitives::Event isDeadEvent;
|
||||
AuMutex featuresMutex;
|
||||
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
|
||||
bool rejecting {};
|
||||
bool exiting {};
|
||||
bool shuttingdown {};
|
||||
bool exitingflag2 {};
|
||||
AuThreadPrimitives::Event running;
|
||||
//bool running;
|
||||
AuList<AsyncAppWaitSourceRequest> loopSources;
|
||||
AuList<WorkEntry_t> pendingWorkItems;
|
||||
int cookie {0};
|
||||
bool bAlreadyDoingExitTick {};
|
||||
|
||||
//
|
||||
AuThreadPrimitives::SpinLock externalFencesLock;
|
||||
AuList<AuThreading::IWaitable *> externalFences;
|
||||
|
||||
//
|
||||
|
||||
|
||||
|
||||
//
|
||||
};
|
||||
}
|
Loading…
Reference in New Issue
Block a user