AuroraRuntime/Source/IO/Net/AuNetWorker.hpp
Jamie Reece Wilson 0f12603390 [+] AuAsync::NewWorkFunction
[+] AuAsync::IThreadPool::NewWorkFunction
[+] AU_DEBUG_MEMCRUNCH
2023-08-10 03:36:19 +01:00

135 lines
4.8 KiB
C++

#pragma once
namespace Aurora::IO::Net
{
struct NetSrvWorkers;
struct NetWorker : INetWorker
{
NetWorker(NetSrvWorkers *pParent,
AuUInt8 workerIndex,
AuSPtr<IIOProcessor> pIOProcessor);
AuSPtr<AuLoop::ILSEvent> ToEvent();
bool IsOnThread();
AuSPtr<IIOProcessor> ToProcessor() override;
AuUInt8 GetWorkerIndex() override;
void Destroy() override;
bool IncrementIOEventTaskCounter();
void DecrementIOEventTaskCounter();
AuSPtr<IIOProcessorItem> pWorkItem_;
void AddSocket(ISocket *pSocket);
void RemoveSocket(ISocket *pSocket);
AuList<ISocket *> childSockets;
// Yea, so, we have some pretty lax rules about threading...
template <class Status_t, class Error_t = AuNullS, class Callable_t>
bool TryScheduleInternalTemplate(Callable_t &&work,
const AuSPtr<AuAsync::PromiseCallback<Status_t, Error_t>> &callback)
{
struct Waiter : IIOProcessorWorkUnit
{
inline Waiter(const AuConsumer<const AuSPtr<AuAsync::PromiseCallback<Status_t, Error_t>> &> worker,
const AuSPtr<AuAsync::PromiseCallback<Status_t, Error_t>> &callbacks) :
callbacks_(callbacks),
worker_(worker)
{
}
inline Waiter(const AuConsumer<const AuSPtr<AuAsync::PromiseCallback<Status_t, Error_t>> &> worker,
const AuSPtr<AuAsync::PromiseCallback<Status_t, Error_t>> &callbacks,
AuAsync::WorkerPId_t origin) :
callbacks_(callbacks),
worker_(worker),
origin_(origin)
{
SysAssert(MakeProxy());
}
inline virtual void OnRun() override
{
//SysAssert(this->callbacks_);
SysAssert(this->worker_);
this->worker_(this->callbacks_);
}
inline virtual void OnCanceled() override
{
if (!this->callbacks_)
{
return;
}
//SysAssert(this->callbacks_);
this->callbacks_->OnFailure((void *)nullptr);
}
inline bool MakeProxy()
{
auto old = this->callbacks_;
auto temp = AuMakeShared<AuAsync::PromiseCallbackFunctional<Status_t, Error_t>>([=](const AuSPtr<Status_t> &response)
{
auto pWorkItem = this->origin_.pool->NewWorkFunction(this->origin_,[response, callbacks = old]()
{
callbacks->OnSuccess(response.get());
});
SysAssert(pWorkItem->Dispatch(), "A network task failed to dispatch critically");
},
[=](const AuSPtr<Error_t> &response)
{
auto pWorkItem = this->origin_.pool->NewWorkFunction(this->origin_, [response, callbacks = old]()
{
callbacks->OnFailure(response.get());
});
SysAssert(pWorkItem->Dispatch(), "A network task failed to dispatch critically");
});
if (!temp)
{
return {};
}
this->callbacks_ = temp;
return true;
}
const AuConsumer<const AuSPtr<AuAsync::PromiseCallback<Status_t, Error_t>> &> worker_;
AuSPtr<AuAsync::PromiseCallback<Status_t, Error_t>> callbacks_;
AuAsync::WorkerPId_t origin_;
};
AuSPtr<Waiter> temp;
auto pid = AuAsync::GetCurrentWorkerPId();
if (pid)
{
temp = AuMakeShared<Waiter>(work, callback, pid);
}
else
{
temp = AuMakeShared<Waiter>(work, callback);
}
if (!temp)
{
SysPushErrorIO("No memory to allocate work unit");
return false;
}
return this->pIOProcessor_->SubmitIOWorkItem(temp);
}
private:
AuThreadPrimitives::SpinLock spinLock_;
AuSPtr<IIOProcessor> pIOProcessor_;
NetSrvWorkers *pParent_;
AuUInt8 workerIndex_;
AuSPtr<AuLoop::ILSEvent> pEvent_;
AuUInt32 dwAtomicEventCounter_ {};
};
}