/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: IPCPromises.hpp Date: 2022-07-19 Author: Reece ***/ #pragma once struct AuNullS { AuUInt8 null; }; namespace Aurora::Async { struct IPromiseCompleter { virtual void Complete() = 0; virtual void Fail() = 0; }; struct IPromiseComplete { virtual void OnSuccess(void *in) = 0; }; struct IPromiseFailure { virtual void OnFailure(void *in) = 0; }; struct IPromisePtrs { virtual void *GetSuccessPtr() = 0; virtual void *GetErrorPtr() = 0; virtual AuSPtr GetData() = 0; }; template struct PromiseSuccessTmpl : IPromiseComplete { using OnSuccess_f = AuFunction &)>; inline virtual void OnSuccess(void *in); inline void Init(IPromisePtrs *parent, const OnSuccess_f &onDelegate) noexcept { this->parent_ = parent; this->onSuccess_ = onDelegate; } private: IPromisePtrs *parent_; OnSuccess_f onSuccess_; }; template struct PromiseCallback : IPromiseFailure, IPromiseComplete { inline PromiseCallback() {} inline virtual void OnFailure(void *in) override { this->OnFailure(AuReinterpretCast(in)); } inline virtual void OnSuccess(void *in) override { this->OnSuccess(AuReinterpretCast(in)); } virtual void OnFailure(ErrorValue_t *in) = 0; virtual void OnSuccess(SuccessValue_t *in) = 0; }; template struct PromiseCallbackFunctional : PromiseCallback, AuEnableSharedFromThis> { using OnSuccess_f = AuFunction &)>; using OnFailure_f = AuFunction &)>; inline PromiseCallbackFunctional(OnSuccess_f onSuccess, OnFailure_f onFailure) : onSuccess(onSuccess), onFailure(onFailure) { } inline PromiseCallbackFunctional(OnSuccess_f onSuccess) : onSuccess(onSuccess) { } inline PromiseCallbackFunctional() { } virtual void OnFailure(ErrorValue_t *in) override { if (this->onFailure) { if (!in) { this->onFailure({}); } else { this->onFailure(AuSPtr(AuSharedFromThis(), in)); } } } virtual void OnSuccess(SuccessValue_t *in) override { if (this->onSuccess) { if (!in) { this->onSuccess({}); } else { this->onSuccess(AuSPtr(AuSharedFromThis(), in)); } } } OnFailure_f onFailure; OnSuccess_f onSuccess; }; template struct PromiseFailTmpl : IPromiseFailure { using OnFailure_f = AuFunction &)>; inline virtual void OnFailure(void *in) override; inline void Init(IPromisePtrs *parent, const OnFailure_f &onDelegate) noexcept { this->parent_ = parent; this->onFailure_ = onDelegate; } private: IPromisePtrs *parent_; OnFailure_f onFailure_; }; template struct Promise; template struct IPromiseAccessor : IPromiseCompleter { inline virtual AuSPtr WriteIntoSuccess() = 0; inline virtual AuSPtr WriteIntoError() = 0; }; struct IPromiseDelegate { virtual void OnPromiseDelegate(const AuSPtr &promise) = 0; }; template struct PromiseCallTmpl : IPromiseDelegate { using OnDelegate_f = AuFunction> &)>; inline virtual void OnPromiseDelegate(const AuSPtr &promise) override; inline void Init(IPromisePtrs *parent, const OnDelegate_f &onDelegate) noexcept { this->parent_ = parent; this->onDelegate_ = onDelegate; } private: IPromisePtrs *parent_; OnDelegate_f onDelegate_; }; struct IPromisePrivateCallbacks { virtual void OnCBTick() = 0; virtual void OnCBDelegate() = 0; virtual void OnCBFailure() = 0; }; struct PromiseStartupAdapter : IWorkItemHandler { inline PromiseStartupAdapter(IPromisePrivateCallbacks *parent) noexcept { this->parent_ = parent; } inline virtual void DispatchFrame(IWorkItemHandler::ProcessInfo &info) override { this->parent_->OnCBDelegate(); }; inline virtual void OnFailure() override { SysPushErrorGeneric("Didnt fire initial delegate"); this->parent_->OnCBFailure(); }; private: IPromisePrivateCallbacks *parent_; }; struct PromiseTickAdapter : IWorkItemHandler { inline PromiseTickAdapter(IPromisePrivateCallbacks *parent) noexcept { this->parent_ = parent; } inline virtual void DispatchFrame(IWorkItemHandler::ProcessInfo &info) override { this->parent_->OnCBTick(); }; inline virtual void OnFailure() override { SysPushErrorGeneric("Didnt fire initial delegate"); this->parent_->OnCBFailure(); }; private: IPromisePrivateCallbacks *parent_; }; template struct Promise final : IPromiseAccessor, AuEnableSharedFromThis>, private IPromisePtrs, private IPromisePrivateCallbacks { using This_t = Promise; AU_SAFE_DESTROY(Promise); Promise(bool inlineResponse = false, bool allowSync = false); using FunctionalCall_t = PromiseCallTmpl; using FunctionalResponse_t = PromiseSuccessTmpl; using FunctionalFail_t = PromiseFailTmpl; using FunctionalResponse_f = typename FunctionalResponse_t::OnSuccess_f; using FunctionalFail_f = typename FunctionalFail_t::OnFailure_f; using FunctionalDelegate_f = typename FunctionalCall_t::OnDelegate_f; inline AuSPtr SetCallback(const AuSPtr> &callbacks); inline AuSPtr OnSuccessFunctional(const FunctionalResponse_f &in); inline AuSPtr OnSuccessCallback(const AuSPtr &in); inline AuSPtr OnFailureFunctional(const FunctionalFail_f &in); inline AuSPtr OnFailureCallback(const AuSPtr &in); inline AuSPtr WriteIntoSuccess() override; inline AuSPtr WriteIntoError() override; inline void Complete() override; inline void Fail() override; //inline AuSPtr GetBarrier() = 0; inline bool WaitBeforeWork(const AuSPtr &startingBarrier); inline bool BeginWork(WorkerPId_t worker = GetCurrentWorkerPId()); inline bool BeginWorkEx(const AuSPtr &promise, WorkerPId_t worker = GetCurrentWorkerPId()); inline bool BeginWorkFunc(const FunctionalDelegate_f &promise, WorkerPId_t worker = GetCurrentWorkerPId()); inline bool FinalizeWithWork(const AuSPtr &sendingFires); private: bool bHasBeganWork {}; AuSPtr startingBarrier_; PromiseStartupAdapter startupAdapter; PromiseTickAdapter tickAdapter; inline virtual void OnCBTick() override; inline virtual void OnCBDelegate() override; inline virtual void OnCBFailure() override; inline void DestroyPrivate(); inline void *GetSuccessPtr() override; inline void *GetErrorPtr() override; inline AuSPtr GetData() override; void DoStartBlocking(); void DoFailureBlocking(); void DoSuccessBlocking(); void DoEndBlocking(); void DispatchNotifier(); WorkerPId_t currentThread_; SuccessValue_t success; ErrorValue_t error; bool bHasSuccess_ {}; bool bHasError_ {}; bool bHasFiredSuccess_ {}; bool bHasFiredError_ {}; bool bInlineResponse_ {}; FunctionalResponse_t responseFunctional_; AuSPtr responseCallback_; FunctionalFail_t failureFunctional_; AuSPtr failureCallback_; FunctionalCall_t delegateFunctional_; AuSPtr delegateCallback_; bool bHasSentError_ {}; bool bHasSentSuccess_ {}; Aurora::Threading::Primitives::SpinLock lock_; AuSPtr notifier_; AuList> startBarriers_; }; template inline Promise::Promise(bool inlineResponse, bool allowSync) : startupAdapter(this), tickAdapter(this) { this->bInlineResponse_ = inlineResponse; auto pid = GetCurrentWorkerPId(); } template inline void Promise::DestroyPrivate() { if (this->notifier_) { this->notifier_->Cancel(); } this->notifier_.reset(); if (!this->bHasFiredSuccess_ && !this->bHasFiredError_) { this->bHasFiredError_ = true; DoFailureBlocking(); } } template inline void Promise::DoStartBlocking() {} template inline void Promise::DoEndBlocking() { this->failureCallback_.reset(); this->responseCallback_.reset(); this->currentThread_ = {}; } template inline void Promise::OnCBTick() { if (AuExchange(this->bHasFiredError_, false)) { DoFailureBlocking(); } else if (AuExchange(this->bHasFiredSuccess_, false)) { DoSuccessBlocking(); } else { SysPanic(""); } this->bHasSentSuccess_ = true; this->bHasSentError_ = true; } template inline void Promise::OnCBDelegate() { //SysAssert(this->currentThread_ == GetCurrentWorkerPId()); SysAssert(this->delegateCallback_); AuExchange(this->delegateCallback_, decltype(this->delegateCallback_) {})->OnPromiseDelegate(AuSharedFromThis()); } template inline void Promise::OnCBFailure() { Destroy(); } template inline void Promise::DoFailureBlocking() { { AU_LOCK_GUARD(this->lock_); if (AuExchange(this->bHasSentError_, true)) { return; } } this->DoStartBlocking(); if (this->failureCallback_) { this->failureCallback_->OnFailure(this->bHasError_ ? &this->error : nullptr); } this->DoEndBlocking(); } template inline void Promise::DoSuccessBlocking() { { AU_LOCK_GUARD(this->lock_); if (AuExchange(this->bHasSentError_, true)) { return; } if (AuExchange(this->bHasSentSuccess_, true)) { return; } } this->DoStartBlocking(); if (this->responseCallback_) { this->responseCallback_->OnSuccess(this->bHasSuccess_ ? &this->success : nullptr); } this->DoEndBlocking(); } template inline void *Promise::GetSuccessPtr() { return &this->success; } template inline void *Promise::GetErrorPtr() { return &this->error; } template inline AuSPtr Promise::GetData() { return AuSPtr(AuSharedFromThis()); } template inline void Promise::Complete() { SysAssert(!this->bHasFiredSuccess_); SysAssert(!this->bHasFiredError_); this->bHasFiredSuccess_ = true; if (this->bInlineResponse_ && this->currentThread_ == GetCurrentWorkerPId()) { this->DoSuccessBlocking(); } else { DispatchNotifier(); } } template inline void Promise::Fail() { SysAssert(!this->bHasFiredSuccess_); this->bHasFiredError_ = true; if (this->bInlineResponse_ && this->currentThread_ == GetCurrentWorkerPId()) { this->DoFailureBlocking(); } else { DispatchNotifier(); } } template inline void Promise::DispatchNotifier() { if (!this->notifier_) { this->notifier_ = this->currentThread_.pool->NewWorkItem(this->currentThread_, AuSPtr(AuSharedFromThis(), &this->tickAdapter)); } SysAssert(this->notifier_); this->notifier_->Dispatch(); } template inline bool Promise::WaitBeforeWork(const AuSPtr &startingBarrier) { if (!this->startingBarrier_) { this->startingBarrier_ = GetCurrentWorkerPId().pool->NewFence(); } if (!this->startingBarrier_) { return false; } this->startingBarrier_->WaitFor(startingBarrier); return true; } template inline bool Promise::BeginWork(WorkerPId_t worker) { this->currentThread_ = GetCurrentWorkerPId(); SysAssert(worker.pool); if (worker == GetCurrentWorkerPId()) { if (this->startingBarrier_) { this->startingBarrier_->Dispatch(); return true; } this->OnCBDelegate(); return true; } else { auto newItem = worker.pool->NewWorkItem(worker, AuSPtr(AuSharedFromThis(), &this->startupAdapter)); if (!newItem) { return {}; } if (this->startingBarrier_) { this->startingBarrier_->Then(newItem); } else { newItem->Dispatch(); } return true; } } template inline bool Promise::BeginWorkEx(const AuSPtr &promise, WorkerPId_t worker) { this->delegateCallback_ = promise; return BeginWork(worker); } template inline bool Promise::BeginWorkFunc(const FunctionalDelegate_f &promise, WorkerPId_t worker) { this->delegateFunctional_.Init(this, promise); return BeginWorkEx(AuUnsafeRaiiToShared(&this->delegateFunctional_), worker); } template inline AuSPtr> Promise::SetCallback(const AuSPtr> &callbacks) { this->OnSuccessCallback(callbacks); this->OnFailureCallback(callbacks); return AuSPtr(AuSharedFromThis()); } template inline AuSPtr> Promise::OnSuccessFunctional(const FunctionalResponse_f &in) { this->responseFunctional_.Init(this, in); return OnSuccessCallback(AuUnsafeRaiiToShared(&this->responseFunctional_)); } template inline AuSPtr> Promise::OnSuccessCallback(const AuSPtr &in) { bool succeedFast {}; { AU_LOCK_GUARD(this->lock_); if (this->bHasSentSuccess_) { succeedFast = true; } else { this->responseCallback_ = in; } } if (succeedFast) { in->OnSuccess(this->bHasSuccess_ ? &this->success : nullptr); } return AuSPtr(AuSharedFromThis()); } template inline AuSPtr> Promise::OnFailureFunctional(const FunctionalFail_f &in) { this->failureFunctional_.Init(this, in); return OnFailureCallback(AuUnsafeRaiiToShared(&this->failureFunctional_)); } template inline AuSPtr> Promise::OnFailureCallback(const AuSPtr &in) { bool failFast {}; { AU_LOCK_GUARD(this->lock_); if (this->bHasSentError_) { failFast = true; } else { this->failureCallback_ = in; } } if (failFast) { in->OnFailure(this->bHasError_ ? &this->error : nullptr); } return AuSPtr(AuSharedFromThis()); } template inline AuSPtr Promise::WriteIntoSuccess() { this->bHasSuccess_ = true; return AuSPtr(AuSharedFromThis(), &this->success); } template inline AuSPtr Promise::WriteIntoError() { this->bHasError_ = true; return AuSPtr(AuSharedFromThis(), &this->error); } template inline void PromiseCallTmpl::OnPromiseDelegate(const AuSPtr &promise) { auto ptr = static_cast *>(promise.get()); auto sptr = AuSPtr>(promise, ptr); SysAssert(this->onDelegate_); this->onDelegate_(sptr); } template inline void PromiseFailTmpl::OnFailure(void *in) { SysAssert(this->onFailure_); if (!in) { this->onFailure_({}); } else { this->onFailure_(AuSPtr(this->parent_->GetData(), (ErrorValue_t *) in)); } } template inline void PromiseSuccessTmpl::OnSuccess(void *in) { SysAssert(this->onSuccess_); if (!in) { this->onSuccess_({}); } else { this->onSuccess_(AuSPtr(this->parent_->GetData(), (SuccessValue_t *)in)); } } } using AuNullPromise = Aurora::Async::Promise; template using AuPromise = Aurora::Async::Promise;