AuroraRPC/Source/AuRPCServer.cpp
Jamie Reece Wilson aa7591967a [+] Broadcasts and client/server messages
[+] Async responses
[+] AuRPC::SetRecommendedPipeLength
2023-12-16 18:16:32 +00:00

189 lines
4.2 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuRPCServer.cpp
Date: 2022-6-29
Author: Reece
***/
#include <AuroraRuntime.hpp>
#include "AuRPC.hpp"
#include "AuRPCServer.hpp"
#include "AuRPCServerChannel.hpp"
#include "AuRPCRequest.hpp"
bool AuRPCServer::Init(AuRPC *parent, AuAsync::WorkerPId_t worker)
{
if (this->channel)
{
SysPushErrorGen("Double init");
return {};
}
this->worker = worker;
this->parent = parent;
this->channel = NewChannel(false);
if (!this->channel)
{
return false;
}
this->channel->MakeTemp();
return true;
}
bool AuRPCServer::RegisterService(const AuSPtr<AuIRPCService> &service)
{
if (!service)
{
SysPushErrorArg();
return {};
}
AU_LOCK_GUARD(this->lock->AsWritable());
return AuTryInsert(this->serviceTable, service->GetId(), service);
}
void AuRPCServer::BroadcastMessage(const AuMemoryViewRead &view)
{
AuList<AuSPtr<AuRPCServerChannel>> channels;
{
AU_LOCK_GUARD(this->lock->AsReadable());
for (const auto &pChannel : this->channel->subchannels_)
{
if (pChannel)
{
if (!pChannel->isTempChannel_)
{
channels.push_back(pChannel);
}
}
}
}
auto pMessage = AuMakeSharedPanic<AuRPCResponseOwned>();
pMessage->PrepareResponse(kGeneralBroadcast);
pMessage->buffer->Write(view.ptr, view.length);
pMessage->FinalizeWrite();
for (const auto &pChannel : channels)
{
pChannel->SendResponse(pMessage);
}
}
AuList<AuSPtr<AuIRPCSession>> AuRPCServer::GetClients()
{
AuList<AuSPtr<AuIRPCSession>> ret;
if (auto pChannel = this->channel)
{
for (const auto &pClientChannel : pChannel->subchannels_)
{
ret.push_back(pClientChannel);
}
}
return ret;
}
void AuRPCServer::SetCallbacks(const AuSPtr<AuIRPCServerCallbacks> &pCallbacks)
{
this->pCallbacks = pCallbacks;
}
AuString AuRPCServer::ExportToString()
{
return ToPrimaryChannel()->ExportString();
}
AuSPtr<AuRPCServerChannel> AuRPCServer::ToPrimaryChannel()
{
if (this->channel)
{
return this->channel;
}
return this->channel = NewChannel(true);
}
void AuRPCServer::Dispatch(AuRPCServerChannel *channel,
const AuSPtr<AuRPCRequest> &request,
AuByteBuffer *buffer)
{
AU_LOCK_GUARD(this->lock->AsReadable());
auto pResponse = AuMakeSharedPanic<AuRPCResponseOwned>();
pResponse->cookie = request->cookie;
pResponse->PrepareResponse(kResponseRPC);
auto itr = this->serviceTable.find(request->serviceId);
if (itr == this->serviceTable.end())
{
pResponse->FinalizeWrite();
channel->SendResponse(pResponse);
return;
}
try
{
pResponse->PrepareMessage();
AuSharedFuture<void> refFuture;
itr->second->Dispatch(pResponse, request->methodId, *buffer, refFuture);
if (!refFuture)
{
pResponse->FinalizeWrite();
channel->SendResponse(pResponse);
}
else
{
auto pShared = channel->SharedFromThis();
refFuture->OnComplete([=]()
{
pResponse->FinalizeWrite();
pShared->SendResponse(pResponse);
});
}
}
catch (...)
{
SysPushErrorCatch();
channel->FatalError();
}
}
AuSPtr<AuRPCServerChannel> AuRPCServer::NewChannel(bool bIsPrimary)
{
auto eh = AuMakeShared<AuRPCServerChannel>(this->parent->SharedFromThis(), AuSPtr<AuRPCServer>(this->parent->SharedFromThis(), this));
if (!eh)
{
return {};
}
if ((this->worker.second != AuAsync::kThreadIdAny) &&
(this->worker == AuAsync::GetCurrentWorkerPId()))
{
if (!eh->Init())
{
return {};
}
}
else
{
DispatchOn(this->worker, [&]()
{
if (!eh->Init())
{
eh.reset();
}
})->BlockUntilComplete();
}
return eh;
}