[-] Comment out noted problematic code. A fix must be implemented soon

[*] Harden Async.hpp utilities against various conditions one might expect in the wild
[+] Added SetWorkerIdIsThreadRunner
[*] Fix a bug where a cv signal and broadcast was inverted
This commit is contained in:
Reece Wilson 2021-10-16 00:01:49 +01:00
parent 45bf3e7a44
commit 5e0cc1ccfa
4 changed files with 80 additions and 32 deletions

View File

@ -238,7 +238,12 @@ namespace Aurora::Async
virtual void SetConsoleCommandDispatcher(WorkerId_t id) = 0;
// Spawning
virtual bool Spawn(WorkerId_t) = 0;
virtual bool Spawn(WorkerId_t workerId) = 0;
// Event runner threads release upon encountering a zero work condition allowing for a clean exit of event driven apps without the headache of carefully chaining together exit callbacks
// Applications that aren't designed around an event driven model should set callerOwns to true to keep the around during global work exhaustion
virtual void SetWorkerIdIsThreadRunner(WorkerId_t, bool runner) = 0;
virtual Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) = 0;
virtual AuBST<ThreadGroup_t, AuList<ThreadId_t>> GetThreads() = 0;
virtual WorkerId_t GetCurrentThread() = 0;
@ -431,7 +436,10 @@ namespace Aurora::Async
}
else
{
pin->callback.onSuccess(pin->input, *pin->resultValue_);
if (pin->callback.onSuccess)
{
pin->callback.onSuccess(pin->input, *pin->resultValue_);
}
}
}
else
@ -459,7 +467,10 @@ namespace Aurora::Async
};
// TODO: this is somewhat evil. double alloc when we could reuse this
NewWorkItem(caller, AuMakeShared<BasicWorkStdFunc>(func, err))->Dispatch();
if (!NewWorkItem(caller, AuMakeShared<BasicWorkStdFunc>(func, err))->Dispatch())
{
pin->CallOnFailure(true);
}
}
}
catch (...)
@ -519,12 +530,27 @@ namespace Aurora::Async
private:
void DispatchFrame(ProcessInfo &info) override
{
if constexpr (AuIsBaseOfTemplate<Frame_t, decltype(frame)>::value)
{
if (!frame)
{
info.type = IWorkItemHandler::EProcessNext::eFinished;
return;
}
}
frame();
info.type = IWorkItemHandler::EProcessNext::eFinished;
}
void Shutdown() override
{
if constexpr (AuIsBaseOfTemplate<Cleanup_t, decltype(cleanup)>::value)
{
if (!cleanup)
{
return;
}
}
cleanup();
}
};
@ -535,6 +561,7 @@ namespace Aurora::Async
template<typename T = void, typename... Args, AU_TEMPLATE_ENABLE_WHEN(std::is_same_v<T, bool> || std::is_void<T>::value)>
static std::function<T(Args&&...)> TranslateAsyncFunctionToDispatcherWithThread(WorkerId_t id, std::function<void(Args...)> func)
{
if (!func) return {};
return [=](Args&&... in) -> T
{
auto work = AuMakeShared<BasicWorkStdFunc>([=]() -> void {
@ -563,6 +590,7 @@ namespace Aurora::Async
if (!work) ASYNC_ERROR("can't dispatch async call; out of memory");
work.task.onProcess = [=](const AVoid &) -> AuOptional<B>
{
if (!func) return B{};
return func(in...);
};
work.callback.onSuccess = [=](const AVoid &, const B &ret)
@ -591,6 +619,8 @@ namespace Aurora::Async
template<typename Info_t = AVoid, typename Result_t = AVoid, typename Task_t = FTask<Info_t, Result_t>, typename Job_t = FJob<Info_t, Result_t>>
static AuSPtr<Async::IWorkItem> DispatchBasicWorkCallback(const WorkerId_t &worker, const Task_t &task, const Job_t &job, const Info_t &inputParameters, bool enableWait = false)
{
// TOOD: use faster object if job parguments are invalid
// It would be nice if we didn't have to drag the job callback pair around with us
return Async::NewWorkItem(worker, AuMakeShared<Async::BasicWorkCallback<Info_t, Result_t>>(task, job, inputParameters), enableWait)->Dispatch();
}

View File

@ -35,7 +35,7 @@ namespace Aurora::Async
//STATIC_TLS(WorkerId_t, tlsWorkerId);
static Threading::Threads::TLSVariable<WorkerId_t, true> tlsWorkerId;
using WorkEntry_t = AuPair<AuOptional<ThreadId_t>, AuSPtr<IAsyncRunnable>>;
using WorkEntry_t = AuPair<ThreadId_t, AuSPtr<IAsyncRunnable>>;
struct AsyncAppWaitSourceRequest
{
@ -50,6 +50,8 @@ namespace Aurora::Async
{
WorkerId_t id;
bool eventDriven {};
AuUInt8 multipopCount = 1;
AuUInt32 lastFrameTime {};
@ -214,12 +216,12 @@ namespace Aurora::Async
if (target.second == Async::kThreadIdAny)
{
// sad :(
state->cvVariable->Broadcast();
state->cvVariable->Signal();
}
else
{
state->cvVariable->Signal();
// sad :(
state->cvVariable->Broadcast();
}
}
@ -298,7 +300,7 @@ namespace Aurora::Async
success = PollInternal(block);
success |= state->inLoopSourceMode;
}
} while (!block || success);
} while (success);
return success;
}
@ -429,14 +431,14 @@ namespace Aurora::Async
(state->pendingWorkItems.size() < state->multipopCount));
)
{
if (!itr->first.has_value())
if (itr->first == Async::kThreadIdAny)
{
state->pendingWorkItems.push_back(*itr);
itr = group->workQueue.erase(itr);
continue;
}
if ((itr->first.has_value()) && (itr->first.value() == state->id.second))
if ((itr->first != Async::kThreadIdAny) && (itr->first == state->id.second))
{
state->pendingWorkItems.push_back(*itr);
itr = group->workQueue.erase(itr);
@ -511,11 +513,15 @@ namespace Aurora::Async
CtxPollReturn(state, magic, true);
if (runningTasks == 0)
{
ShutdownZero();
}
if (state->eventDriven)
{
if (runningTasks == 0)
{
ShutdownZero();
}
}
return true;
}
@ -645,6 +651,12 @@ namespace Aurora::Async
return shuttingdown_ || GetThreadState()->exiting;
}
void AsyncApp::SetWorkerIdIsThreadRunner(WorkerId_t workerId, bool callerOwns)
{
GetThreadHandle(workerId)->eventDriven = callerOwns;
}
bool AsyncApp::Spawn(WorkerId_t workerId)
{
AU_LOCK_GUARD(rwlock_->AsWritable());
@ -691,6 +703,7 @@ namespace Aurora::Async
threadState->running = Threading::Primitives::EventUnique(true, false, true);
threadState->syncSema = Threading::Primitives::SemaphoreUnique(0);
threadState->id = workerId;
threadState->eventDriven = true;
if (!threadState->IsSysThread())
{
@ -857,6 +870,8 @@ namespace Aurora::Async
Poll(true);
}
LogDbg("Thread leaving: {} {} {}", auThread->GetName(), auThread->Exiting(), job->shuttingdown);
if (id != WorkerId_t {0, 0})
{
AU_LOCK_GUARD(rwlock_->AsReadable());

View File

@ -30,8 +30,11 @@ namespace Aurora::Async
void SetConsoleCommandDispatcher(WorkerId_t id) override;
// Spawning
bool Spawn(WorkerId_t) override;
bool Spawn(WorkerId_t workerId) override;
void SetWorkerIdIsThreadRunner(WorkerId_t, bool runner) override;
Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) override;
AuBST<ThreadGroup_t, AuList<ThreadId_t>> GetThreads() override;
WorkerId_t GetCurrentThread() override;

View File

@ -263,23 +263,23 @@ namespace Aurora::Debug
}
}
}
else if (strnicmp(descriptor->name, ".?AV", AuArraySize(".?AV") - 1) == 0)
{
/* Annoying
https://blog.quarkslab.com/visual-c-rtti-inspection.html
This structure is very important to identify an object since it contains its VFT (field pVFTable) and its mangled name. That's why it usually starts with ".?AV", which means "a C++ class". These structures are stored in the section ".data".
We decided to do pattern matching on ".?AV" to get the field name of _TypeInformation and thus retrieves the RTTICompleteObjectLocator.
... we would then have to traverse the hierarchy to determine the root most signature (std::exceptions vtable hash)
TODO(Reece): fix me, this is evil and shouldn't make it into the wild
Fix before 1.0
*/
auto exception = reinterpret_cast<std::exception *>(ExceptionInfo->ExceptionRecord->ExceptionInformation[1]);
if (IsReadable(exception))
{
entry.wincxx.str = exception->what();
}
}
//else if (strnicmp(descriptor->name, ".?AV", AuArraySize(".?AV") - 1) == 0)
//{
// /* Annoying
// https://blog.quarkslab.com/visual-c-rtti-inspection.html
// This structure is very important to identify an object since it contains its VFT (field pVFTable) and its mangled name. That's why it usually starts with ".?AV", which means "a C++ class". These structures are stored in the section ".data".
// We decided to do pattern matching on ".?AV" to get the field name of _TypeInformation and thus retrieves the RTTICompleteObjectLocator.
// ... we would then have to traverse the hierarchy to determine the root most signature (std::exceptions vtable hash)
// TODO(Reece): fix me, this is evil and shouldn't make it into the wild
// Fix before 1.0
// */
//
// auto exception = reinterpret_cast<std::exception *>(ExceptionInfo->ExceptionRecord->ExceptionInformation[1]);
// if (IsReadable(exception))
// {
// entry.wincxx.str = exception->what();
// }
//}
else if (strlen(descriptor->name) == 0)
{
auto exception = reinterpret_cast<std::exception *>(ExceptionInfo->ExceptionRecord->ExceptionInformation[1]);