[+] AuAsync::NewWorkFunction

[+] AuAsync::IThreadPool::NewWorkFunction
[+] AU_DEBUG_MEMCRUNCH
This commit is contained in:
Reece Wilson 2023-08-10 03:34:44 +01:00
parent a0c82788d9
commit 0f12603390
18 changed files with 173 additions and 105 deletions

View File

@ -26,16 +26,23 @@ namespace Aurora::Async
AUKN_SYM WorkerPId_t GetCurrentWorkerPId();
/// Async app only | Thread pools must use the IThreadPool::NewFence function
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking = false);
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task);
/**
* @brief Creates an asynchronous job object to be executed on an async runner
* @param worker A worker id pair. Supports AuAsync::kThreadIdAny to run on any runner within the group.
* @param task An interface to run the job/work/task in question
* @param supportsBlocking Optimization hint for IWorkItem::BlockUntilComplete()
* @return
*/
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerPId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking = false);
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerPId_t &worker, const AuSPtr<IWorkItemHandler> &task);
/**
* @brief
* @param worker
* @param func
* @return
*/
AUKN_SYM AuSPtr<IWorkItem> NewWorkFunction(const WorkerPId_t &worker, AuVoidFunc func);
/// Async app only | Thread pools must use the IThreadPool::NewFence function
AUKN_SYM AuSPtr<IWorkItem> NewFence();

View File

@ -157,26 +157,20 @@ public:
static AuSPtr<AuFuture<T, Error_t>> New()
{
AuDebug::AddMemoryCrunch();
auto pRet = AuSPtr<AuFuture<T, Error_t>>(new AuFuture(), AuDefaultDeleter<AuFuture<T, Error_t>> {});
AuDebug::DecMemoryCrunch();
return pRet;
AU_DEBUG_MEMCRUNCH;
return AuSPtr<AuFuture<T, Error_t>>(new AuFuture(), AuDefaultDeleter<AuFuture<T, Error_t>> {});
}
static AuSPtr<AuFuture<T, Error_t>> New(AuConsumer<Move_t> callback)
{
AuDebug::AddMemoryCrunch();
auto pRet = AuSPtr<AuFuture<T, Error_t>>(new AuFuture(callback), AuDefaultDeleter<AuFuture<T, Error_t>> {});
AuDebug::DecMemoryCrunch();
return pRet;
AU_DEBUG_MEMCRUNCH;
return AuSPtr<AuFuture<T, Error_t>>(new AuFuture(callback), AuDefaultDeleter<AuFuture<T, Error_t>> {});
}
static AuSPtr<AuFuture<T, Error_t>> New(AuConsumer<Move_t> callback, ErrorCallback_f onFailure)
{
AuDebug::AddMemoryCrunch();
auto pRet = AuSPtr<AuFuture<T, Error_t>>(new AuFuture(callback, onFailure), AuDefaultDeleter<AuFuture<T, Error_t>> {});
AuDebug::DecMemoryCrunch();
return pRet;
AU_DEBUG_MEMCRUNCH;
return AuSPtr<AuFuture<T, Error_t>>(new AuFuture(callback, onFailure), AuDefaultDeleter<AuFuture<T, Error_t>> {});
}
protected:
@ -271,13 +265,11 @@ private:
return;
}
AuDebug::AddMemoryCrunch();
AuAsync::NewWorkItem(this->pid.value(), AuMakeSharedPanic<AuAsync::BasicWorkStdFunc>([pThat = this->SharedFromThis()]
{
AU_LOCK_GUARD(pThat->mutex);
pThat->SubmitComplete();
}))->Dispatch();
AuDebug::DecMemoryCrunch();
}
}

View File

@ -74,8 +74,10 @@ namespace Aurora::Async
//
virtual AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker,
const AuSPtr<IWorkItemHandler> &task,
bool bSupportsBlocking = false) = 0;
const AuSPtr<IWorkItemHandler> &task) = 0;
virtual AuSPtr<IWorkItem> NewWorkFunction(const WorkerId_t &worker,
AuVoidFunc callback) = 0;
virtual AuSPtr<IWorkItem> NewFence() = 0;

View File

@ -83,15 +83,13 @@ namespace Aurora::Debug
AUKN_SYM AuResult<AuString> DemangleName(const AuString &pName);
AUKN_SYM void DebugBreak();
AUKN_SYM void AddMemoryCrunch();
AUKN_SYM void DecMemoryCrunch();
}
#include "SysErrors.hpp"
#include "SysPanic.hpp"
#include "SysAssertions.hpp"
#include "ErrorStack.hpp"
#include "MemoryCrunch.hpp"
struct AuDbgStringSharedException : AuStringException
{

View File

@ -0,0 +1,29 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: MemoryCrunch.hpp
Date: 2021-6-9 - 2023-10-08
Author: Reece
***/
#pragma once
namespace Aurora::Debug
{
AUKN_SYM void AddMemoryCrunch();
AUKN_SYM void DecMemoryCrunch();
struct MemoryCrunch
{
inline MemoryCrunch()
{
AddMemoryCrunch();
}
inline ~MemoryCrunch()
{
DecMemoryCrunch();
}
};
#define AU_DEBUG_MEMCRUNCH Aurora::Debug::MemoryCrunch AU_CONCAT(__crunch, __COUNTER__);
}

View File

@ -17,11 +17,7 @@
#include "Sinks.hpp"
#include "LogClasses.hpp"
namespace Aurora::Debug
{
AUKN_SYM void AddMemoryCrunch();
AUKN_SYM void DecMemoryCrunch();
}
#include <Aurora/Debug/MemoryCrunch.hpp>
namespace Aurora::Logging
{
@ -41,35 +37,31 @@ namespace Aurora::Logging
template<typename Line_t, typename ... T>
inline void WriteLinef(AuUInt8 level, const AuString &tag, const Line_t &msg, T&& ... args)
{
Aurora::Debug::AddMemoryCrunch();
AU_DEBUG_MEMCRUNCH;
try
{
WriteLine(level, ConsoleMessage(EAnsiColor::eReset, tag, fmt::format(msg, AuForward<T>(args)...)));
}
catch (...)
{
Aurora::Debug::DecMemoryCrunch();
throw;
return;
}
Aurora::Debug::DecMemoryCrunch();
}
template<typename Line_t, typename ... T>
inline void WriteLinef(AuUInt8 level, EAnsiColor color, const AuString &tag, const Line_t &msg, T&& ... args)
{
Aurora::Debug::AddMemoryCrunch();
AU_DEBUG_MEMCRUNCH;
try
{
WriteLine(level, ConsoleMessage(color, tag, fmt::format(msg, AuForward<T>(args)...)));
}
catch (...)
{
Aurora::Debug::DecMemoryCrunch();
{;
throw;
return;
}
Aurora::Debug::DecMemoryCrunch();
}
template<typename Line_t, typename ... T>

View File

@ -138,9 +138,14 @@ namespace Aurora::Async
return ThreadPool::Exiting();
}
AuSPtr<IWorkItem> AsyncApp::NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking)
AuSPtr<IWorkItem> AsyncApp::NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task)
{
return ThreadPool::NewWorkItem(worker, task, supportsBlocking);
return ThreadPool::NewWorkItem(worker, task);
}
AuSPtr<IWorkItem> AsyncApp::NewWorkFunction(const WorkerId_t &worker, AuVoidFunc callback)
{
return ThreadPool::NewWorkFunction(worker, callback);
}
AuSPtr<IWorkItem> AsyncApp::NewFence()

View File

@ -29,7 +29,8 @@ namespace Aurora::Async
bool Exiting() override;
AuUInt32 PollAndCount(bool bStrict = true) override;
AuUInt32 RunAllPending() override;
AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking) override;
AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task) override;
AuSPtr<IWorkItem> NewWorkFunction(const WorkerId_t &worker, AuVoidFunc callback) override;
AuSPtr<IWorkItem> NewFence() override;
Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) override;
AuBST<ThreadGroup_t, AuList<ThreadId_t>> GetThreads() override;

View File

@ -242,12 +242,13 @@ namespace Aurora::Async
gThread->Run();
}
bool Schedule(AuUInt64 ns, IThreadPoolInternal *pool,
WorkerId_t target,
AuSPtr<IAsyncRunnable> runnable)
{
AU_LOCK_GUARD(gSchedLock);
AU_DEBUG_MEMCRUNCH;
AuAtomicAdd(&pool->uAtomicCounter, 1u);
SchedNextTime(ns);
return AuTryInsert(gOrderedEntries,

View File

@ -136,7 +136,7 @@ namespace Aurora::Async
return;
}
AuDebug::AddMemoryCrunch();
AU_DEBUG_MEMCRUNCH;
if (bIncrement)
{
@ -156,8 +156,6 @@ namespace Aurora::Async
pWorker->cvVariable->Signal();
pWorker->eventLs->Set();
}
AuDebug::DecMemoryCrunch();
}
IThreadPool *ThreadPool::ToThreadPool()
@ -775,8 +773,7 @@ namespace Aurora::Async
}
AuSPtr<IWorkItem> ThreadPool::NewWorkItem(const WorkerId_t &worker,
const AuSPtr<IWorkItemHandler> &task,
bool bSupportsBlocking)
const AuSPtr<IWorkItemHandler> &task)
{
// Error pass-through
if (!task)
@ -784,12 +781,20 @@ namespace Aurora::Async
return {};
}
return AuMakeShared<WorkItem>(this, WorkerPId_t { AuAsync::GetCurrentWorkerPId().pool, worker }, task, bSupportsBlocking);
return AuMakeShared<WorkItem>(this, WorkerPId_t { this->SharedFromThis(), worker }, task);
}
AuSPtr<IWorkItem> ThreadPool::NewWorkFunction(const WorkerId_t &worker,
AuVoidFunc callback)
{
SysAssert(callback);
return AuMakeShared<FuncWorker>(this, WorkerPId_t { this->SharedFromThis(), worker }, AuMove(callback));
}
AuSPtr<IWorkItem> ThreadPool::NewFence()
{
return AuMakeShared<WorkItem>(this, AuAsync::GetCurrentWorkerPId(), AuSPtr<IWorkItemHandler>{}, true);
return AuMakeShared<WorkItem>(this, AuAsync::GetCurrentWorkerPId(), AuSPtr<IWorkItemHandler>{});
}
AuThreads::ThreadShared_t ThreadPool::ResolveHandle(WorkerId_t id)
@ -920,7 +925,7 @@ namespace Aurora::Async
pFeature->Init();
}));
auto pWorkItem = this->NewWorkItem(id, work, !bNonBlock)->Dispatch();
auto pWorkItem = this->NewWorkItem(id, work)->Dispatch();
SysAssert(pWorkItem);
if (!bNonBlock)

View File

@ -57,7 +57,8 @@ namespace Aurora::Async
virtual AuUInt32 PollAndCount(bool bStrict = true) override;
virtual AuUInt32 RunAllPending() override;
virtual AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking) override;
virtual AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task) override;
virtual AuSPtr<IWorkItem> NewWorkFunction(const WorkerId_t &worker, AuVoidFunc callback) override;
virtual AuSPtr<IWorkItem> NewFence() override;
virtual Threading::Threads::ThreadShared_t ResolveHandle(WorkerId_t) override;

View File

@ -13,10 +13,18 @@
namespace Aurora::Async
{
FuncWorker::FuncWorker(IThreadPoolInternal *owner,
const WorkerPId_t &worker,
AuVoidFunc &&func) :
WorkItem(owner, worker, {}),
func(func)
{
}
WorkItem::WorkItem(IThreadPoolInternal *owner,
const WorkerPId_t &worker,
const AuSPtr<IWorkItemHandler> &task,
bool bSupportsBlocking) :
const AuSPtr<IWorkItemHandler> &task) :
worker_(worker), task_(task), owner_(owner),
finishedEvent_(false, true, true)
{
@ -250,15 +258,11 @@ namespace Aurora::Async
Fail();
}
void WorkItem::RunAsyncLocked2()
void WorkItem::DispatchTask(IWorkItemHandler::ProcessInfo &info)
{
AU_LOCK_GUARD(this->lock2);
IWorkItemHandler::ProcessInfo info(true);
info.pool = this->owner_->ToThreadPool();
if (this->task_)
{
try
{
this->task_->DispatchFrame(info);
@ -271,7 +275,17 @@ namespace Aurora::Async
return;
}
}
}
void WorkItem::RunAsyncLocked2()
{
AU_LOCK_GUARD(this->lock2);
IWorkItemHandler::ProcessInfo info(true);
info.pool = this->owner_->ToThreadPool();
DispatchTask(info);
RunAsyncLocked2(info);
}
@ -448,37 +462,68 @@ namespace Aurora::Async
return AuStaticPointerCast<ThreadPool>(pool);
}
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking)
void FuncWorker::DispatchTask(IWorkItemHandler::ProcessInfo &info)
{
if (func)
{
func();
}
}
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerId_t &worker, const AuSPtr<IWorkItemHandler> &task)
{
AU_DEBUG_MEMCRUNCH;
if (!task)
{
SysPushErrorArg("WorkItem has null task. Running out of memory?");
return {};
}
return AuMakeShared<WorkItem>(GetWorkerInternal(), WorkerPId_t { AuAsync::GetCurrentWorkerPId().pool, worker }, task, supportsBlocking);
return AuMakeShared<WorkItem>(GetWorkerInternal(), WorkerPId_t { AuAsync::GetCurrentWorkerPId().pool, worker }, task);
}
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerPId_t &worker, const AuSPtr<IWorkItemHandler> &task, bool supportsBlocking)
AUKN_SYM AuSPtr<IWorkItem> NewWorkFunction(const WorkerPId_t &worker, AuVoidFunc func)
{
AU_DEBUG_MEMCRUNCH;
if (!func)
{
SysPushErrorArg("WorkItem has null function");
return {};
}
if (!worker)
{
SysPushErrorArg("invalid worker");
return {};
}
return AuMakeSharedThrow<FuncWorker>(GetWorkerInternal(worker.pool).get(), worker, AuMove(func));
}
AUKN_SYM AuSPtr<IWorkItem> NewWorkItem(const WorkerPId_t &worker, const AuSPtr<IWorkItemHandler> &task)
{
AU_DEBUG_MEMCRUNCH;
if (!task)
{
SysPushErrorArg("WorkItem has null task. Running out of memory?");
return {};
}
if (!worker.pool)
if (!worker)
{
SysPushErrorArg("WorkItem has null pool");
SysPushErrorArg("invalid worker");
return {};
}
return AuMakeShared<WorkItem>(GetWorkerInternal(worker.pool).get(), worker, task, supportsBlocking);
return AuMakeSharedThrow<WorkItem>(GetWorkerInternal(worker.pool).get(), worker, task);
}
AUKN_SYM AuSPtr<IWorkItem> NewFence()
{
return AuMakeShared<WorkItem>(GetWorkerInternal(), AuAsync::GetCurrentWorkerPId(), AuSPtr<IWorkItemHandler>{}, true);
return AuMakeShared<WorkItem>(GetWorkerInternal(), AuAsync::GetCurrentWorkerPId(), AuSPtr<IWorkItemHandler>{});
}
void *WorkItem::GetPrivateData()

View File

@ -18,8 +18,7 @@ namespace Aurora::Async
{
WorkItem(IThreadPoolInternal *owner,
const WorkerPId_t &worker,
const AuSPtr<IWorkItemHandler> &task,
bool bSupportsBlocking);
const AuSPtr<IWorkItemHandler> &task);
~WorkItem();
AuSPtr<IWorkItem> WaitFor(const AuSPtr<IWorkItem> &workItem) override;
@ -60,6 +59,8 @@ namespace Aurora::Async
void DispatchEx(bool check);
void DispatchExLocked(bool check);
virtual void DispatchTask(IWorkItemHandler::ProcessInfo &info);
AuSPtr<IWorkItemHandler> task_;
WorkerPId_t worker_;
EWorkPrio prio_ = EWorkPrio::eNormalPrio;
@ -80,4 +81,16 @@ namespace Aurora::Async
bool Schedule();
void SendOff();
};
struct FuncWorker : WorkItem
{
FuncWorker(IThreadPoolInternal *owner,
const WorkerPId_t &worker,
AuVoidFunc &&func);
void DispatchTask(IWorkItemHandler::ProcessInfo &info) override;
private:
AuVoidFunc func;
};
}

View File

@ -105,15 +105,10 @@ namespace Aurora::Console::Commands
}
else
{
Async::DispatchWork<CommandDispatch>(workerId,
Async::TaskFromConsumerRefT<CommandDispatch>([](const CommandDispatch &dispatch) -> void
{
dispatch.callback->OnCommand(dispatch.arguments);
}),
{},
CommandDispatch(res.result, callback),
false
);
Async::NewWorkFunction(workerId, [=]() -> void
{
callback->OnCommand(res.result);
});
}
return true;
@ -186,7 +181,7 @@ namespace Aurora::Console::Commands
}
else
{
NewWorkItem(gCommandDispatcher, AuMakeShared<Async::BasicWorkStdFunc>(func), true)->Dispatch()->BlockUntilComplete();
NewWorkItem(gCommandDispatcher, AuMakeShared<Async::BasicWorkStdFunc>(func))->Dispatch()->BlockUntilComplete();
}
}

View File

@ -372,7 +372,8 @@ namespace Aurora::Debug
AUKN_SYM AuString StackTraceEntry::Stringify() const
{
AddMemoryCrunch();
AU_DEBUG_MEMCRUNCH;
const auto frame = *this;
AuString backTraceBuffer;
@ -418,19 +419,18 @@ namespace Aurora::Debug
backTraceBuffer += "\t\t[proprietary]\n";
}
DecMemoryCrunch();
return backTraceBuffer;
}
catch (...)
{
DecMemoryCrunch();
return {};
}
}
AUKN_SYM AuString StringifyStackTrace(const StackTrace &backtrace)
{
AddMemoryCrunch();
AU_DEBUG_MEMCRUNCH;
AuString backTraceBuffer;
try
@ -445,12 +445,10 @@ namespace Aurora::Debug
backTraceBuffer += frame.Stringify();
}
DecMemoryCrunch();
return backTraceBuffer;
}
catch (...)
{
DecMemoryCrunch();
return {};
}
}

View File

@ -266,17 +266,15 @@ static thread_local AuUInt tlsThrowCounter;
extern "C" AUKN_SYM void __stdcall _ReportMSVCSEH(void *exception, const void *throwInfo, void *caller)
{
AuDebug::AddMemoryCrunch();
AU_DEBUG_MEMCRUNCH;
if (!throwInfo)
{
AuDebug::DecMemoryCrunch();
return;
}
if (!exception)
{
AuDebug::DecMemoryCrunch();
return;
}
@ -286,14 +284,12 @@ extern "C" AUKN_SYM void __stdcall _ReportMSVCSEH(void *exception, const void *t
{
if (!GetModuleHandleExA(GET_MODULE_HANDLE_EX_FLAG_FROM_ADDRESS | GET_MODULE_HANDLE_EX_FLAG_UNCHANGED_REFCOUNT, reinterpret_cast<LPCSTR>(caller), reinterpret_cast<HMODULE *>(&handle)))
{
AuDebug::DecMemoryCrunch();
return;
}
}
if (handle == INVALID_HANDLE_VALUE)
{
AuDebug::DecMemoryCrunch();
return;
}
@ -301,7 +297,6 @@ extern "C" AUKN_SYM void __stdcall _ReportMSVCSEH(void *exception, const void *t
if ((tlsThrowCounter++) == 7) // TODO: this might be stupid. we should configure for panic on second
{
tlsThrowCounter--;
AuDebug::DecMemoryCrunch();
return;
}
@ -331,5 +326,4 @@ extern "C" AUKN_SYM void __stdcall _ReportMSVCSEH(void *exception, const void *t
Aurora::Exit::PostLevel(AuThreads::GetThread(), Aurora::Exit::ETriggerLevel::eProblematicEvent);
tlsThrowCounter--;
AuDebug::DecMemoryCrunch();
}

View File

@ -46,9 +46,10 @@ namespace Aurora::Exit
void PostLevel(AuThreads::IAuroraThread *thread, ETriggerLevel level)
{
AU_DEBUG_MEMCRUNCH;
bool bOldTerminatingValue;
AuDebug::AddMemoryCrunch();
{
AU_LOCK_GUARD(gMutex);
@ -75,7 +76,6 @@ namespace Aurora::Exit
{
if (AuAtomicTestAndSet(&gProblemCounter, 1))
{
AuDebug::DecMemoryCrunch();
return;
}
}
@ -102,7 +102,6 @@ namespace Aurora::Exit
// Has already sent eSafeTermination?
if (isPreempting)
{
AuDebug::DecMemoryCrunch();
return;
}
@ -123,7 +122,6 @@ namespace Aurora::Exit
Process::Exit(0);
}
}
AuDebug::DecMemoryCrunch();
}
AUKN_SYM bool ExitHandlerAdd(ETriggerLevel level, const AuSPtr<IExitSubscriber> &callback)

View File

@ -74,26 +74,18 @@ namespace Aurora::IO::Net
auto old = this->callbacks_;
auto temp = AuMakeShared<AuAsync::PromiseCallbackFunctional<Status_t, Error_t>>([=](const AuSPtr<Status_t> &response)
{
auto pWorkItem = this->origin_.pool->NewWorkItem(this->origin_, AuMakeShared<AuAsync::BasicWorkStdFunc>([response, callbacks = old]()
auto pWorkItem = this->origin_.pool->NewWorkFunction(this->origin_,[response, callbacks = old]()
{
callbacks->OnSuccess(response.get());
}, []()
{
SysPanic("Eh");
}));
SysAssert(pWorkItem);
});
SysAssert(pWorkItem->Dispatch(), "A network task failed to dispatch critically");
},
[=](const AuSPtr<Error_t> &response)
{
auto pWorkItem = this->origin_.pool->NewWorkItem(this->origin_, AuMakeShared<AuAsync::BasicWorkStdFunc>([response, callbacks = old]()
auto pWorkItem = this->origin_.pool->NewWorkFunction(this->origin_, [response, callbacks = old]()
{
callbacks->OnFailure(response.get());
}, []()
{
SysPanic("Eh");
}));
SysAssert(pWorkItem);
});
SysAssert(pWorkItem->Dispatch(), "A network task failed to dispatch critically");
});