[*] e037fc21
cleanup cont
This commit is contained in:
parent
e037fc214a
commit
a7dfd899f8
@ -7,7 +7,7 @@
|
|||||||
***/
|
***/
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "GroupState.hpp"
|
#include "AuGroupState.hpp"
|
||||||
#include "ThreadState.hpp"
|
#include "ThreadState.hpp"
|
||||||
#include "AsyncRunnable.hpp"
|
#include "AsyncRunnable.hpp"
|
||||||
|
|
||||||
|
@ -8,8 +8,7 @@
|
|||||||
#include <Source/RuntimeInternal.hpp>
|
#include <Source/RuntimeInternal.hpp>
|
||||||
#include "AuAsyncKeepGroupAlive.hpp"
|
#include "AuAsyncKeepGroupAlive.hpp"
|
||||||
#include "ThreadPool.hpp"
|
#include "ThreadPool.hpp"
|
||||||
#include "ThreadState.hpp"
|
#include "Async.hpp"
|
||||||
#include "GroupState.hpp"
|
|
||||||
|
|
||||||
namespace Aurora::Async
|
namespace Aurora::Async
|
||||||
{
|
{
|
||||||
|
@ -1,13 +1,13 @@
|
|||||||
/***
|
/***
|
||||||
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
File: GroupState.cpp
|
File: AuGroupState.cpp
|
||||||
Date: 2021-11-1
|
Date: 2021-11-1
|
||||||
Author: Reece
|
Author: Reece
|
||||||
***/
|
***/
|
||||||
#include <Source/RuntimeInternal.hpp>
|
#include <Source/RuntimeInternal.hpp>
|
||||||
#include "Async.hpp"
|
#include "Async.hpp"
|
||||||
#include "GroupState.hpp"
|
#include "AuGroupState.hpp"
|
||||||
#include <Source/IO/Loop/LSAsync.hpp>
|
#include <Source/IO/Loop/LSAsync.hpp>
|
||||||
|
|
||||||
namespace Aurora::Async
|
namespace Aurora::Async
|
||||||
@ -102,8 +102,8 @@ namespace Aurora::Async
|
|||||||
if (auto pThreadState = AuMakeShared<ThreadState>())
|
if (auto pThreadState = AuMakeShared<ThreadState>())
|
||||||
{
|
{
|
||||||
pThreadState->thread.bOwnsThread = !bCreate;
|
pThreadState->thread.bOwnsThread = !bCreate;
|
||||||
|
pThreadState->thread.id = workerId;
|
||||||
pThreadState->parent = this->SharedFromThis();
|
pThreadState->parent = this->SharedFromThis();
|
||||||
pThreadState->id = workerId;
|
|
||||||
|
|
||||||
if (!pThreadState->Init())
|
if (!pThreadState->Init())
|
||||||
{
|
{
|
@ -1,7 +1,7 @@
|
|||||||
/***
|
/***
|
||||||
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
File: GroupState.hpp
|
File: AuGroupState.hpp
|
||||||
Date: 2021-11-1
|
Date: 2021-11-1
|
||||||
Author: Reece
|
Author: Reece
|
||||||
***/
|
***/
|
||||||
@ -14,7 +14,6 @@ namespace Aurora::Async
|
|||||||
{
|
{
|
||||||
struct ThreadPool;
|
struct ThreadPool;
|
||||||
|
|
||||||
|
|
||||||
struct GroupState :
|
struct GroupState :
|
||||||
AuEnableSharedFromThis<GroupState>
|
AuEnableSharedFromThis<GroupState>
|
||||||
{
|
{
|
@ -7,8 +7,7 @@
|
|||||||
***/
|
***/
|
||||||
#include <Source/RuntimeInternal.hpp>
|
#include <Source/RuntimeInternal.hpp>
|
||||||
#include "AuGroupWorkQueue.hpp"
|
#include "AuGroupWorkQueue.hpp"
|
||||||
#include "ThreadState.hpp"
|
#include "Async.hpp"
|
||||||
#include "GroupState.hpp"
|
|
||||||
#include "ThreadPool.hpp"
|
#include "ThreadPool.hpp"
|
||||||
|
|
||||||
namespace Aurora::Async
|
namespace Aurora::Async
|
||||||
|
@ -10,11 +10,13 @@
|
|||||||
namespace Aurora::Async
|
namespace Aurora::Async
|
||||||
{
|
{
|
||||||
struct AsyncLoop;
|
struct AsyncLoop;
|
||||||
|
struct GroupState;
|
||||||
struct ThreadState;
|
struct ThreadState;
|
||||||
|
|
||||||
struct ThreadStateShutdown
|
struct ThreadStateShutdown
|
||||||
{
|
{
|
||||||
bool bBreakMainLoop {};
|
bool bBreakMainLoop {};
|
||||||
|
bool bDropSubmissions {};
|
||||||
bool bIsKillerThread {};
|
bool bIsKillerThread {};
|
||||||
AuUInt32 uShutdownFence { 1 };
|
AuUInt32 uShutdownFence { 1 };
|
||||||
};
|
};
|
||||||
@ -38,6 +40,7 @@ namespace Aurora::Async
|
|||||||
{
|
{
|
||||||
bool bOwnsThread {};
|
bool bOwnsThread {};
|
||||||
AuThreads::ThreadShared_t pThread;
|
AuThreads::ThreadShared_t pThread;
|
||||||
|
WorkerId_t id;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ThreadStateStack
|
struct ThreadStateStack
|
||||||
@ -46,16 +49,25 @@ namespace Aurora::Async
|
|||||||
AuUInt32 uStackMaxRecursiveAsyncPromiseCalls { 4 };
|
AuUInt32 uStackMaxRecursiveAsyncPromiseCalls { 4 };
|
||||||
AuUInt32 uStackCallDepth {};
|
AuUInt32 uStackCallDepth {};
|
||||||
AuUInt32 uStackMaxCookie { 5 };
|
AuUInt32 uStackMaxCookie { 5 };
|
||||||
|
AuUInt8 uWorkMultipopCount { 32 };
|
||||||
|
};
|
||||||
|
|
||||||
|
struct ThreadStateFeatureCallbacks
|
||||||
|
{
|
||||||
|
AuMutex mutex;
|
||||||
|
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ThreadStateBase
|
struct ThreadStateBase
|
||||||
{
|
{
|
||||||
|
AuWPtr<GroupState> parent;
|
||||||
|
/////////////////////////////////
|
||||||
ThreadStateShutdown shutdown;
|
ThreadStateShutdown shutdown;
|
||||||
ThreadStateMeta thread;
|
ThreadStateMeta thread;
|
||||||
ThreadStateSync sync;
|
ThreadStateSync sync;
|
||||||
AuSPtr<AsyncLoop> asyncLoop;
|
AuSPtr<AsyncLoop> asyncLoop;
|
||||||
ThreadStateStack stackState;
|
ThreadStateStack stackState;
|
||||||
|
ThreadStateFeatureCallbacks tlsFeatures;
|
||||||
|
|
||||||
bool Init();
|
bool Init();
|
||||||
};
|
};
|
||||||
|
@ -70,9 +70,9 @@ namespace Aurora::Async
|
|||||||
return Threading::WaitFor(primitive.get(), timeoutMs);
|
return Threading::WaitFor(primitive.get(), timeoutMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool workerIdMatches = (unlocker.second == curThread->id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1));
|
bool workerIdMatches = (unlocker.second == curThread->thread.id.second) || ((unlocker.second == Async::kThreadIdAny) && (GetThreadWorkersCount(unlocker.first) == 1));
|
||||||
|
|
||||||
if ((unlocker.first == curThread->id.first) &&
|
if ((unlocker.first == curThread->thread.id.first) &&
|
||||||
(unlocker.pool.get() == this) && // work group matches
|
(unlocker.pool.get() == this) && // work group matches
|
||||||
(workerIdMatches)) // well, crap
|
(workerIdMatches)) // well, crap
|
||||||
{
|
{
|
||||||
@ -144,6 +144,12 @@ namespace Aurora::Async
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pWorker->shutdown.bDropSubmissions)
|
||||||
|
{
|
||||||
|
runnable->CancelAsync();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (bIncrement)
|
if (bIncrement)
|
||||||
{
|
{
|
||||||
AuAtomicAdd(&this->uAtomicCounter, 1u);
|
AuAtomicAdd(&this->uAtomicCounter, 1u);
|
||||||
@ -296,7 +302,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
group->workQueue.Dequeue(state->pendingWorkItems, state->multipopCount, state->id.second);
|
group->workQueue.Dequeue(state->pendingWorkItems, state->stackState.uWorkMultipopCount, state->thread.id.second);
|
||||||
|
|
||||||
state->sync.UpdateCVState(state.get());
|
state->sync.UpdateCVState(state.get());
|
||||||
|
|
||||||
@ -351,7 +357,7 @@ namespace Aurora::Async
|
|||||||
// ...these primitives are far less expensive to hit than resetting kernel primitives
|
// ...these primitives are far less expensive to hit than resetting kernel primitives
|
||||||
// AU_LOCK_GUARD(state->cvWorkMutex) used to protect us
|
// AU_LOCK_GUARD(state->cvWorkMutex) used to protect us
|
||||||
|
|
||||||
if (group->workQueue.IsEmpty(this, state->id))
|
if (group->workQueue.IsEmpty(this, state->thread.id))
|
||||||
{
|
{
|
||||||
state->sync.eventLs->Reset(); // ...until we're done
|
state->sync.eventLs->Reset(); // ...until we're done
|
||||||
AuAtomicStore(&state->sync.cvLSActive, 0u);
|
AuAtomicStore(&state->sync.cvLSActive, 0u);
|
||||||
@ -487,12 +493,12 @@ namespace Aurora::Async
|
|||||||
AU_LOCK_GUARD(pGroup->workersMutex);
|
AU_LOCK_GUARD(pGroup->workersMutex);
|
||||||
for (auto &[id, worker] : pGroup->workers)
|
for (auto &[id, worker] : pGroup->workers)
|
||||||
{
|
{
|
||||||
if (trySelfPid == worker->id)
|
if (trySelfPid == worker->thread.id)
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
toBarrier.push_back(worker->id);
|
toBarrier.push_back(worker->thread.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -588,7 +594,7 @@ namespace Aurora::Async
|
|||||||
auto handle = this->GetThreadHandle(id);
|
auto handle = this->GetThreadHandle(id);
|
||||||
if (handle)
|
if (handle)
|
||||||
{
|
{
|
||||||
handle->rejecting = false;
|
handle->shutdown.bDropSubmissions = false;
|
||||||
handle->isDeadEvent->LockMS(250);
|
handle->isDeadEvent->LockMS(250);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -717,7 +723,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
for (const auto &thread : pGroup->workers)
|
for (const auto &thread : pGroup->workers)
|
||||||
{
|
{
|
||||||
workers.push_back(thread.second->id.second);
|
workers.push_back(thread.second->thread.id.second);
|
||||||
}
|
}
|
||||||
|
|
||||||
ret[pGroup->group] = workers;
|
ret[pGroup->group] = workers;
|
||||||
@ -742,7 +748,7 @@ namespace Aurora::Async
|
|||||||
{
|
{
|
||||||
for (auto &jobWorker : group->workers)
|
for (auto &jobWorker : group->workers)
|
||||||
{
|
{
|
||||||
if (!Barrier(jobWorker.second->id, timeoutMs, requireSignal && jobWorker.second->id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
|
if (!Barrier(jobWorker.second->thread.id, timeoutMs, requireSignal && jobWorker.second->thread.id.second != currentWorkerId, false)) // BAD!, should subtract time elapsed, clamp to, i dunno, 5ms min?
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -800,7 +806,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
for (auto &jobWorker : pGroup->workers)
|
for (auto &jobWorker : pGroup->workers)
|
||||||
{
|
{
|
||||||
SysAssert(Barrier(jobWorker.second->id, 0, false, false));
|
SysAssert(Barrier(jobWorker.second->thread.id, 0, false, false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -812,8 +818,10 @@ namespace Aurora::Async
|
|||||||
auto pWorkItem = DispatchOn({ this->SharedFromThis(), id }, [=]()
|
auto pWorkItem = DispatchOn({ this->SharedFromThis(), id }, [=]()
|
||||||
{
|
{
|
||||||
auto pState = GetThreadState();
|
auto pState = GetThreadState();
|
||||||
AU_LOCK_GUARD(pState->featuresMutex);
|
{
|
||||||
pState->features.push_back(pFeature);
|
AU_LOCK_GUARD(pState->tlsFeatures.mutex);
|
||||||
|
pState->tlsFeatures.features.push_back(pFeature);
|
||||||
|
}
|
||||||
pFeature->Init();
|
pFeature->Init();
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -921,7 +929,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
if (this->shuttingdown_ & 2) // fast
|
if (this->shuttingdown_ & 2) // fast
|
||||||
{
|
{
|
||||||
if (pA->rejecting)
|
if (pA->shutdown.bDropSubmissions)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -1103,7 +1111,7 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
if (drop)
|
if (drop)
|
||||||
{
|
{
|
||||||
state->rejecting = true;
|
state->shutdown.bDropSubmissions = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (requireSignal)
|
if (requireSignal)
|
||||||
@ -1152,7 +1160,7 @@ namespace Aurora::Async
|
|||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->pRWReadView);
|
AU_LOCK_GUARD(this->pRWReadView);
|
||||||
|
|
||||||
if (!this->shuttingdown_ && !job->rejecting)
|
if (!this->shuttingdown_ && !job->shutdown.bDropSubmissions)
|
||||||
{
|
{
|
||||||
// Pump and barrier + reject all after atomically
|
// Pump and barrier + reject all after atomically
|
||||||
Barrier(id, 0, false, true);
|
Barrier(id, 0, false, true);
|
||||||
@ -1202,8 +1210,8 @@ namespace Aurora::Async
|
|||||||
|
|
||||||
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
|
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(jobWorker->featuresMutex);
|
AU_LOCK_GUARD(jobWorker->tlsFeatures.mutex);
|
||||||
features = AuExchange(jobWorker->features, {});
|
features = AuExchange(jobWorker->tlsFeatures.features, {});
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
@ -1258,8 +1266,8 @@ namespace Aurora::Async
|
|||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(pLocalState->featuresMutex);
|
AU_LOCK_GUARD(pLocalState->tlsFeatures.mutex);
|
||||||
features = AuExchange(pLocalState->features, {});
|
features = AuExchange(pLocalState->tlsFeatures.features, {});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,54 +13,26 @@
|
|||||||
|
|
||||||
namespace Aurora::Async
|
namespace Aurora::Async
|
||||||
{
|
{
|
||||||
// TODO: this is a hack because i havent implemented an epoll abstraction yet
|
|
||||||
struct AsyncAppWaitSourceRequest
|
|
||||||
{
|
|
||||||
AuConsumer<AuSPtr<AuLoop::ILoopSource>, bool> callback;
|
|
||||||
AuSPtr<AuLoop::ILoopSource> loopSource;
|
|
||||||
AuUInt32 requestedOffset;
|
|
||||||
AuUInt64 startTime;
|
|
||||||
AuUInt64 endTime;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct GroupState;
|
struct GroupState;
|
||||||
struct AsyncLoop;
|
struct AsyncLoop;
|
||||||
|
|
||||||
struct ThreadState : ThreadStateBase
|
struct ThreadState : ThreadStateBase
|
||||||
{
|
{
|
||||||
ThreadState() : running(true, false, true),
|
ThreadState() :
|
||||||
|
running(true, false, true),
|
||||||
isDeadEvent(false, false, true)
|
isDeadEvent(false, false, true)
|
||||||
{
|
{ }
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// :vomit:
|
// :vomit:
|
||||||
WorkerId_t id;
|
|
||||||
AuUInt8 multipopCount = 32;
|
|
||||||
AuWPtr<GroupState> parent;
|
|
||||||
AuThreadPrimitives::Semaphore syncSema;
|
AuThreadPrimitives::Semaphore syncSema;
|
||||||
AuThreadPrimitives::Event isDeadEvent;
|
AuThreadPrimitives::Event isDeadEvent;
|
||||||
AuMutex featuresMutex;
|
|
||||||
AuList<AuSPtr<AuThreads::IThreadFeature>> features;
|
|
||||||
bool rejecting {};
|
|
||||||
bool exiting {};
|
bool exiting {};
|
||||||
bool shuttingdown {};
|
bool shuttingdown {};
|
||||||
bool exitingflag2 {};
|
bool exitingflag2 {};
|
||||||
AuThreadPrimitives::Event running;
|
AuThreadPrimitives::Event running;
|
||||||
//bool running;
|
|
||||||
AuList<AsyncAppWaitSourceRequest> loopSources;
|
|
||||||
AuList<WorkEntry_t> pendingWorkItems;
|
AuList<WorkEntry_t> pendingWorkItems;
|
||||||
int cookie {0};
|
|
||||||
bool bAlreadyDoingExitTick {};
|
bool bAlreadyDoingExitTick {};
|
||||||
|
|
||||||
//
|
|
||||||
AuThreadPrimitives::SpinLock externalFencesLock;
|
AuThreadPrimitives::SpinLock externalFencesLock;
|
||||||
AuList<AuThreading::IWaitable *> externalFences;
|
AuList<AuThreading::IWaitable *> externalFences;
|
||||||
|
|
||||||
//
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//
|
|
||||||
};
|
};
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user