Jamie Reece Wilson
8944d8bd16
[+] IAsyncTimerCallback [+] ETickType.hpp [+] EWorkPriority.hpp [+] static IThreadPool::GetSelfIOProcessor() [+] static IThreadPool::GetSelfIONetInterface() [+] static IThreadPool::GetSelfIONetWorker() [-] [Source/Async/]AsyncRunnable.hpp [*] Begin encapsulating WorkerPId_t [*] WorkerPId_t no longer take strong pointers to prevent leaks given that these identifiers are copied and kept alive everywhere
135 lines
4.6 KiB
C++
135 lines
4.6 KiB
C++
#pragma once
|
|
|
|
namespace Aurora::IO::Net
|
|
{
|
|
struct NetSrvWorkers;
|
|
|
|
struct NetWorker :
|
|
INetWorker,
|
|
AuEnableSharedFromThis<NetWorker>
|
|
{
|
|
NetWorker(NetSrvWorkers *pParent,
|
|
AuUInt8 workerIndex,
|
|
AuSPtr<IIOProcessor> pIOProcessor);
|
|
|
|
AuSPtr<AuLoop::ILSEvent> ToEvent();
|
|
|
|
bool IsOnThread();
|
|
|
|
AuSPtr<IIOProcessor> ToProcessor() override;
|
|
AuUInt8 GetWorkerIndex() override;
|
|
void Destroy() override;
|
|
|
|
bool IncrementIOEventTaskCounter();
|
|
void DecrementIOEventTaskCounter();
|
|
|
|
AuSPtr<IIOProcessorItem> pWorkItem_;
|
|
|
|
void AddSocket(ISocket *pSocket);
|
|
void RemoveSocket(ISocket *pSocket);
|
|
|
|
AuList<ISocket *> childSockets;
|
|
|
|
// Yea, so, we have some pretty lax rules about threading...
|
|
template <class Status_t, class Error_t = AuNullS, class Callable_t>
|
|
bool TryScheduleInternalTemplate(Callable_t &&work,
|
|
const AuSPtr<AuAsync::PromiseCallback<Status_t, Error_t>> &callback)
|
|
{
|
|
struct Waiter : IIOProcessorWorkUnit
|
|
{
|
|
inline Waiter(const AuConsumer<const AuSPtr<AuAsync::PromiseCallback<Status_t, Error_t>> &> worker,
|
|
const AuSPtr<AuAsync::PromiseCallback<Status_t, Error_t>> &callbacks) :
|
|
callbacks_(callbacks),
|
|
worker_(worker)
|
|
{
|
|
}
|
|
|
|
inline Waiter(const AuConsumer<const AuSPtr<AuAsync::PromiseCallback<Status_t, Error_t>> &> worker,
|
|
const AuSPtr<AuAsync::PromiseCallback<Status_t, Error_t>> &callbacks,
|
|
AuAsync::WorkerPId_t origin) :
|
|
callbacks_(callbacks),
|
|
worker_(worker),
|
|
origin_(origin)
|
|
{
|
|
SysAssert(MakeProxy());
|
|
}
|
|
|
|
inline virtual void OnRun() override
|
|
{
|
|
//SysAssert(this->callbacks_);
|
|
SysAssert(this->worker_);
|
|
this->worker_(this->callbacks_);
|
|
}
|
|
|
|
inline virtual void OnCanceled() override
|
|
{
|
|
if (!this->callbacks_)
|
|
{
|
|
return;
|
|
}
|
|
//SysAssert(this->callbacks_);
|
|
this->callbacks_->OnFailure((void *)nullptr);
|
|
}
|
|
|
|
inline bool MakeProxy()
|
|
{
|
|
auto old = this->callbacks_;
|
|
auto temp = AuMakeShared<AuAsync::PromiseCallbackFunctional<Status_t, Error_t>>([=](const AuSPtr<Status_t> &response)
|
|
{
|
|
(void)AuAsync::DispatchOn(this->origin_,[response, callbacks = old]()
|
|
{
|
|
callbacks->OnSuccess(response.get());
|
|
});
|
|
},
|
|
[=](const AuSPtr<Error_t> &response)
|
|
{
|
|
(void)AuAsync::DispatchOn(this->origin_, [response, callbacks = old]()
|
|
{
|
|
callbacks->OnFailure(response.get());
|
|
});
|
|
});
|
|
|
|
if (!temp)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
this->callbacks_ = temp;
|
|
return true;
|
|
}
|
|
|
|
const AuConsumer<const AuSPtr<AuAsync::PromiseCallback<Status_t, Error_t>> &> worker_;
|
|
AuSPtr<AuAsync::PromiseCallback<Status_t, Error_t>> callbacks_;
|
|
AuAsync::WorkerPId_t origin_;
|
|
};
|
|
AuSPtr<Waiter> temp;
|
|
|
|
auto pid = AuAsync::GetCurrentWorkerPId();
|
|
if (pid)
|
|
{
|
|
temp = AuMakeShared<Waiter>(work, callback, pid);
|
|
}
|
|
else
|
|
{
|
|
temp = AuMakeShared<Waiter>(work, callback);
|
|
}
|
|
|
|
if (!temp)
|
|
{
|
|
SysPushErrorIO("No memory to allocate work unit");
|
|
return false;
|
|
}
|
|
|
|
return this->pIOProcessor_->SubmitIOWorkItem(temp);
|
|
}
|
|
|
|
private:
|
|
AuThreadPrimitives::SpinLock spinLock_;
|
|
AuSPtr<IIOProcessor> pIOProcessor_;
|
|
NetSrvWorkers *pParent_;
|
|
AuUInt8 workerIndex_;
|
|
AuSPtr<AuLoop::ILSEvent> pEvent_;
|
|
AuUInt32 dwAtomicEventCounter_ {};
|
|
|
|
};
|
|
} |