[+] Added WorkerId_t structure

[-] Removed WorkerId_t typedef
[*] Added shared support to some older threading apis that have yet to be updated
This commit is contained in:
Reece Wilson 2021-10-08 20:51:34 +01:00
parent 027f47c7f8
commit f559897b42
20 changed files with 213 additions and 86 deletions

View File

@ -21,20 +21,32 @@ namespace Aurora::Async
AUKN_SYM IAsyncApp *GetAsyncApp();
using ThreadGroup_t = AuUInt8;
using ThreadId_t = AuUInt16;
/// ThreadGroup_t:
/// 0 = system main thread
/// 1+ = user defined
///
using ThreadGroup_t = AuUInt8;
/// ThreadId_t:
/// -1 = invalid
/// index = tid/runner id
///
using WorkerId_t = AuPair<ThreadGroup_t, ThreadId_t>;
using ThreadId_t = AuUInt16;
static const ThreadId_t kThreadIdAny = -1;
struct WorkerId_t : AuPair<ThreadGroup_t, ThreadId_t>
{
WorkerId_t() : AuPair<ThreadGroup_t, ThreadId_t>(0, 0)
{}
WorkerId_t(ThreadGroup_t group) : AuPair<ThreadGroup_t, ThreadId_t>(group, kThreadIdAny)
{}
WorkerId_t(ThreadGroup_t group, ThreadId_t id) : AuPair<ThreadGroup_t, ThreadId_t>(group, id)
{}
WorkerId_t(const WorkerId_t &cpy) : AuPair<ThreadGroup_t, ThreadId_t>(cpy.first, cpy.second)
{}
};
struct WorkPriv
{

View File

@ -9,10 +9,46 @@
#include <Aurora/Parse/Parse.hpp>
namespace Aurora::Async
{
struct WorkerId_t;
}
namespace Aurora::Console::Commands
{
using CommandCallback_cb = std::function<void(const Parse::ParsedObject &)>;
AUKN_INTERFACE(ICommandSubscriber,
AUI_METHOD(void, onCommand, (const Parse::ParsedObject &, parseList))
);
AUKN_INTERFACE(ITextLineSubscriber,
AUI_METHOD(void, onProcessedLineUTF8, (const AuString &, line))
);
AUKN_SYM void AddCommand(const AuString &tag, const Parse::ParseObject &commandStructure, const CommandCallback_cb &callback);
AUKN_SYM void AddCommand(const AuString &tag, const Parse::ParseObject &commandStructure, const AuSPtr<ICommandSubscriber> &subscriber);
/**
* Dispatch a command to the main thread or aurora async overloaded command dispatcher thread worker id
*/
AUKN_SYM bool DispatchCommand(const AuString &string);
/**
* Parses `string` and dispatches the parsed command on the current thread instantly
*/
AUKN_SYM bool DispatchCommandThisThread(const AuString &string);
/**
* Parses `string` on the current thread and then schedules the ICommandSubscriber callback on the specified thread
*/
AUKN_SYM bool DispatchCommandToAsyncRunner(const AuString &string, Async::WorkerId_t id);
/**
* Simulates a processed stdin line given `string`
*/
AUKN_SYM bool DispatchRawLine(const AuString &string);
/**
* Hijacks the UTF-8 input line processor coming from the consoles. DispatchCommand remains unaffected
* Call with an empty interface pointer to reenable command processing
*/
AUKN_SYM void SetCallbackAndDisableCmdProcessing(const AuSPtr<ITextLineSubscriber> &subscriber);
}

View File

@ -15,15 +15,15 @@ namespace Aurora::Console
/// Writes a log message to the console subscribers and telemetry outputs
AUKN_SYM void WriteLine(const ConsoleMessage &msg);
/// nonblocking [!!!]
/// you must disable the stdio console logger before using this api [!!!]
/// you must not expect locale translation
/// Consider using the following function for asynchronous utf-8 processed line based input -
/// Aurora::Console::Commands::SetCallbackAndDisableCmdProcessing(...)
/// [!!!] nonblocking
/// [!!!] you must disable the stdio console logger before using this api
AUKN_SYM AuUInt32 ReadStdIn(void *buffer, AuUInt32 length);
/// nonblocking [!!!]
/// you should disable the stdio console logger before using this api [!!!]
/// expect system locale if the logger is not enabled
/// edge case, utf8 if windows and logger is enabled
/// [!!!] nonblocking
/// [!!!] you should disable the stdio console logger before using this api
/// [!!!] expect system locale if the logger is not enabled
AUKN_SYM AuUInt32 WriteStdIn(const void *buffer, AuUInt32 length);
AUKN_SYM void OpenLateStd();

View File

@ -107,8 +107,8 @@ namespace Aurora::Parse
struct ParseResult
{
AuString SyntaxError;
AuString DebugTree;
AuString syntaxError;
AuString debugTree;
ParsedObject result;
};

View File

@ -40,6 +40,12 @@
#define AUKN_SHARED_API(name, type, ...) AU_SHARED_API_EX(AUKN_SYM, name, type, ## __VA_ARGS__)
#if defined(_AURORA_RUNTIME_BUILD_API_INTERFACES)
#define AUKN_INTERFACE AUI_INTERFACE_IMPL
#else
#define AUKN_INTERFACE AUI_INTERFACE_FWD
#endif
#include "Memory/Memory.hpp"
#include "Console/Console.hpp"

View File

@ -19,7 +19,8 @@ namespace Aurora::Threading::Primitives
{
public:
virtual void WaitForSignal() = 0;
virtual void WaitForSignal(Aurora::Threading::IWaitable *waitable) = 0;
virtual void WaitForSignal(Threading::IWaitable *waitable) = 0;
virtual void WaitForSignal(const AuSPtr<Threading::IWaitable> &waitable) = 0;
virtual void Broadcast() = 0;
virtual void Signal() = 0;
};

View File

@ -19,11 +19,11 @@ namespace Aurora::Threading::Primitives
class IConditionVariable
{
public:
virtual IConditionMutex* GetMutex() = 0;
virtual AuSPtr<IConditionMutex> GetMutex() = 0;
virtual bool WaitForSignal(AuUInt32 timeout = 0) = 0;
virtual void Broadcast() = 0;
virtual void Signal() = 0;
};
AUKN_SHARED_API(ConditionVariable, IConditionVariable, IConditionMutex *);
AUKN_SHARED_API(ConditionVariable, IConditionVariable, const AuSPtr<IConditionMutex> &mutex);
}

View File

@ -106,7 +106,7 @@ namespace Aurora::Async
return false;
}
this->cvVariable = Threading::Primitives::ConditionVariableUnique(this->cvWorkMutex.get());
this->cvVariable = Threading::Primitives::ConditionVariableUnique(AuUnsafeRaiiToShared(this->cvWorkMutex));
if (!this->cvVariable)
{
return false;

View File

@ -24,63 +24,102 @@ namespace Aurora::Console::Commands
{
AuString tag;
Parse::ParseObject commandStructure;
CommandCallback_cb callback;
AuSPtr<ICommandSubscriber> callback;
Command(AuString tag, Parse::ParseObject commandStructure, const CommandCallback_cb &callback) : tag(tag), commandStructure(commandStructure), callback(callback) {}
Command(AuString tag, Parse::ParseObject commandStructure, CommandCallback_cb &&callback) : tag(tag), commandStructure(commandStructure), callback(std::move(callback)) {}
Command(AuString tag, Parse::ParseObject commandStructure, const AuSPtr<ICommandSubscriber> &callback) : tag(tag), commandStructure(commandStructure), callback(callback) {}
Command(AuString tag, Parse::ParseObject commandStructure, AuSPtr<ICommandSubscriber> &&callback) : tag(tag), commandStructure(commandStructure), callback(std::move(callback)) {}
};
struct CommandDispatch
{
Parse::ParsedObject arguments;
CommandCallback_cb callback;
AuSPtr<ICommandSubscriber> callback;
CommandDispatch(const Parse::ParsedObject &arguments, const CommandCallback_cb &callback) : arguments(arguments), callback(callback) {}
CommandDispatch(const Parse::ParsedObject &arguments, const AuSPtr<ICommandSubscriber> &callback) : arguments(arguments), callback(callback) {}
};
static bool Dispatch(const AuString &string)
enum class EDispatchType
{
AU_LOCK_GUARD(gPendingCommandsMutex);
AuString tag;
AuString cmdParse;
AuMach offset;
Parse::ParseResult res;
eNow,
eSys,
eAsync
};
static bool Dispatch(const AuString &string, EDispatchType type, Async::WorkerId_t workerId)
{
Parse::ParseResult res;
AuSPtr<ICommandSubscriber> callback;
auto commandSplit = string.find(" ");
if (commandSplit != AuString::npos)
{
tag = string.substr(0, commandSplit);
cmdParse = string.substr(commandSplit + 1);
AU_LOCK_GUARD(gPendingCommandsMutex);
{
AuString tag;
AuString cmdParse;
AuMach offset;
auto commandSplit = string.find(" ");
if (commandSplit != AuString::npos)
{
tag = string.substr(0, commandSplit);
cmdParse = string.substr(commandSplit + 1);
}
else
{
tag = string;
}
auto cmdItr = gCommands.find(tag);
if (cmdItr == gCommands.end())
{
LogWarn("Command {} does not exist", tag);
return false;
}
auto const &cmdEntry = cmdItr->second;
offset = 0;
Parse::ParseState consumable(Parse::StringToConsumable(cmdParse, offset));
auto status = Parse::Parse(consumable, cmdEntry.commandStructure, res);
if (!status)
{
LogWarn("Couldn't parse command {}", string);
return false;
}
if (type == EDispatchType::eSys)
{
gPendingCommands.push_back(CommandDispatch(res.result, cmdEntry.callback));
}
else
{
callback = cmdEntry.callback;
}
}
}
if (type == EDispatchType::eNow)
{
callback->onCommand(res.result);
}
else
{
tag = string;
Async::DispatchBasicWorkCallback<CommandDispatch, Async::AVoid>(workerId,
Async::TaskFromConsumerRefT<CommandDispatch, Async::AVoid>([](const CommandDispatch &dispatch) -> AuOptional<Async::AVoid>
{
dispatch.callback->onCommand(dispatch.arguments);
return {};
}),
{},
CommandDispatch(res.result, callback)
);
}
auto cmdItr = gCommands.find(tag);
if (cmdItr == gCommands.end())
{
LogWarn("Command {} does not exist", tag);
return false;
}
auto const &cmdEntry = cmdItr->second;
offset = 0;
Parse::ParseState consumable(Parse::StringToConsumable(cmdParse, offset));
auto status = Parse::Parse(consumable, cmdEntry.commandStructure, res);
if (!status)
{
LogWarn("Couldn't parse command {}", string);
return false;
}
gPendingCommands.push_back(CommandDispatch(res.result, cmdEntry.callback));
return true;
}
AUKN_SYM void AddCommand(const AuString &tag, const Parse::ParseObject &commandStructure, const CommandCallback_cb &callback)
AUKN_SYM void AddCommand(const AuString &tag, const Parse::ParseObject &commandStructure, const AuSPtr<ICommandSubscriber> &callback)
{
AU_LOCK_GUARD(gPendingCommandsMutex);
gCommands.insert(AuMakePair(tag, Command(tag, commandStructure, callback)));
@ -88,7 +127,35 @@ namespace Aurora::Console::Commands
AUKN_SYM bool DispatchCommand(const AuString &string)
{
return Dispatch(string);
return Dispatch(string, EDispatchType::eSys, {});
}
AUKN_SYM bool DispatchCommandThisThread(const AuString &string)
{
return Dispatch(string, EDispatchType::eNow, {});
}
AUKN_SYM bool DispatchCommandToAsyncRunner(const AuString &string, Async::WorkerId_t id)
{
return Dispatch(string, EDispatchType::eAsync, id);
}
static AuSPtr<ITextLineSubscriber> gExternalLineProcessor;
AUKN_SYM bool DispatchRawLine(const AuString &string)
{
if (gExternalLineProcessor)
{
gExternalLineProcessor->onProcessedLineUTF8(string);
return true;
}
return DispatchCommand(string);
}
AUKN_SYM void SetCallbackAndDisableCmdProcessing(const AuSPtr<ITextLineSubscriber> &subscriber)
{
gExternalLineProcessor = subscriber;
}
void UpdateDispatcher(AuOptional<Async::WorkerId_t> target)
@ -102,7 +169,7 @@ namespace Aurora::Console::Commands
auto commands = std::exchange(gPendingCommands, {});
for (const auto &command : commands)
{
command.callback(command.arguments);
command.callback->onCommand(command.arguments);
}
}
@ -113,7 +180,7 @@ namespace Aurora::Console::Commands
{
for (const auto &command : commands)
{
command.callback(command.arguments);
command.callback->onCommand(command.arguments);
}
}

View File

@ -60,7 +60,6 @@ namespace Aurora::Console
ConsoleFIO::Exit();
Hooks::Deinit();
}
AUKN_SYM void OpenLateStd()
{

View File

@ -149,7 +149,7 @@ void ConsoleFrame::OnCmd(wxCommandEvent &event)
textbox->Clear();
Aurora::Console::Commands::DispatchCommand(line);
Aurora::Console::Commands::DispatchRawLine(line);
}
WxSplitterLine *ConsoleFrame::NewSplitter(wxSize splitter, wxColor color)

View File

@ -20,6 +20,7 @@ namespace Aurora::Threading::Primitives
SemaphoreConditionVariableImpl();
void WaitForSignal(Aurora::Threading::IWaitable *waitable) override;
void WaitForSignal(const AuSPtr<Threading::IWaitable> &waitable) override;
void WaitForSignal() override;
void Signal() override;
void Broadcast() override;
@ -35,7 +36,12 @@ namespace Aurora::Threading::Primitives
}
void SemaphoreConditionVariableImpl::WaitForSignal(Aurora::Threading::IWaitable *waitable)
void SemaphoreConditionVariableImpl::WaitForSignal(Aurora::Threading::IWaitable *waitable)
{
return WaitForSignal(AuUnsafeRaiiToShared(waitable));
}
void SemaphoreConditionVariableImpl::WaitForSignal(const AuSPtr<Threading::IWaitable> &waitable)
{
x_.Lock();
waiters_++;

View File

@ -13,12 +13,12 @@
namespace Aurora::Threading::Primitives
{
ConditionVariableImpl::ConditionVariableImpl(IConditionMutex *mutex) : mutex_(dynamic_cast<IConditionMutexEx *>(mutex))
ConditionVariableImpl::ConditionVariableImpl(const AuSPtr<IConditionMutex> &mutex) : mutex_(std::dynamic_pointer_cast<ConditionMutexImpl>(mutex))
{
}
IConditionMutex *ConditionVariableImpl::GetMutex()
AuSPtr<IConditionMutex> ConditionVariableImpl::GetMutex()
{
return mutex_;
}
@ -81,7 +81,7 @@ namespace Aurora::Threading::Primitives
#endif
}
AUKN_SYM IConditionVariable *ConditionVariableNew(IConditionMutex *mutex)
AUKN_SYM IConditionVariable *ConditionVariableNew(const AuSPtr<IConditionMutex> &mutex)
{
return _new ConditionVariableImpl(mutex);
}

View File

@ -22,16 +22,16 @@
class ConditionVariableImpl : public IConditionVariable
{
public:
ConditionVariableImpl(IConditionMutex *mutex);
ConditionVariableImpl(const AuSPtr<IConditionMutex> &mutex);
IConditionMutex *GetMutex() override;
AuSPtr<IConditionMutex> GetMutex() override;
bool WaitForSignal(AuUInt32 timeout) override;
void Signal() override;
void Broadcast() override;
private:
std::atomic_int signals_, waiting_;
ConditionMutexImpl mutex_;
AuSPtr<ConditionMutexImpl> mutex_;
};
}

View File

@ -12,13 +12,13 @@
namespace Aurora::Threading::Primitives
{
ConditionVariableImpl::ConditionVariableImpl(IConditionMutex *mutex) : mutex_(dynamic_cast<IConditionMutexEx *>(mutex))
ConditionVariableImpl::ConditionVariableImpl(const AuSPtr<IConditionMutex> &mutex) : mutex_(std::dynamic_pointer_cast<IConditionMutexEx>(mutex))
{
InitializeConditionVariable(&winCond_);
}
IConditionMutex *ConditionVariableImpl::GetMutex()
AuSPtr<IConditionMutex> ConditionVariableImpl::GetMutex()
{
return mutex_;
}
@ -46,7 +46,7 @@ namespace Aurora::Threading::Primitives
WakeAllConditionVariable(&winCond_);
}
AUKN_SYM IConditionVariable *ConditionVariableNew(IConditionMutex *mutex)
AUKN_SYM IConditionVariable *ConditionVariableNew(const AuSPtr<IConditionMutex> &mutex)
{
return _new ConditionVariableImpl(mutex);
}

View File

@ -13,16 +13,16 @@ namespace Aurora::Threading::Primitives
class ConditionVariableImpl : public IConditionVariable
{
public:
ConditionVariableImpl(IConditionMutex *mutex);
ConditionVariableImpl(const AuSPtr<IConditionMutex> &mutex);
IConditionMutex *GetMutex() override;
AuSPtr<IConditionMutex> GetMutex() override;
bool WaitForSignal(AuUInt32 timeout) override;
void Signal() override;
void Broadcast() override;
private:
CONDITION_VARIABLE winCond_;
IConditionMutexEx *mutex_;
AuSPtr<IConditionMutexEx> mutex_;
};
}
#endif

View File

@ -13,7 +13,7 @@
namespace Aurora::Threading::Primitives
{
ConditionVariableImpl::ConditionVariableImpl(IConditionMutex *mutex) : mutex_(dynamic_cast<IConditionMutexEx *>(mutex))
ConditionVariableImpl::ConditionVariableImpl(const AuSPtr<IConditionMutex> &mutex) : mutex_(std::dynamic_pointer_cast<IConditionMutexEx>(mutex))
{
auto ret = pthread_cond_init(&pthreadCv_, NULL);
SysAssert(ret == 0, "Couldn't initialize CV");
@ -24,7 +24,7 @@ namespace Aurora::Threading::Primitives
pthread_cond_destroy(&pthreadCv_);
}
IConditionMutex *ConditionVariableImpl::GetMutex()
AuSPtr<IConditionMutex> ConditionVariableImpl::GetMutex()
{
return mutex_;
}
@ -69,7 +69,7 @@ namespace Aurora::Threading::Primitives
SysAssert(ret == 0, "Couldn't wake any CV waiters");
}
AUKN_SYM IConditionVariable *ConditionVariableNew(IConditionMutex *mutex)
AUKN_SYM IConditionVariable *ConditionVariableNew(const AuSPtr<IConditionMutex> &mutex)
{
return _new ConditionVariableImpl(mutex);
}

View File

@ -13,17 +13,17 @@ namespace Aurora::Threading::Primitives
class ConditionVariableImpl : public IConditionVariable
{
public:
ConditionVariableImpl(IConditionMutex *mutex);
ConditionVariableImpl(const AuSPtr<IConditionMutex> &mutex);
~ConditionVariableImpl();
IConditionMutex *GetMutex() override;
AuSPtr<IConditionMutex> GetMutex() override;
bool WaitForSignal(AuUInt32 timeout) override;
void Signal() override;
void Broadcast() override;
private:
pthread_cond_t pthreadCv_;
IConditionMutexEx *mutex_;
AuSPtr<IConditionMutexEx> mutex_;
};
}
#endif

View File

@ -21,7 +21,7 @@ namespace Aurora::Threading::Primitives
return false;
}
condition_ = ConditionVariableUnique(mutex_.get());
condition_ = ConditionVariableUnique(AuUnsafeRaiiToShared(mutex_));
if (!condition_)
{
return false;

View File

@ -10,7 +10,7 @@
namespace Aurora::Threading::Primitives
{
template<bool isread>
template<bool isread>
void RWLockAccessView<isread>::Unlock()
{
if constexpr (isread)
@ -68,7 +68,7 @@ namespace Aurora::Threading::Primitives
return false;
}
condition_ = ConditionVariableUnique(mutex_.get());
condition_ = ConditionVariableUnique(AuUnsafeRaiiToShared(mutex_));
if (!condition_)
{
return false;