/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuRPCServer.cpp Date: 2022-6-29 Author: Reece ***/ #include #include "AuRPC.hpp" #include "AuRPCServer.hpp" #include "AuRPCServerChannel.hpp" #include "AuRPCRequest.hpp" bool AuRPCServer::Init(AuRPC *parent, AuAsync::WorkerPId_t worker) { if (this->channel) { SysPushErrorGen("Double init"); return {}; } this->worker = worker; this->parent = parent; this->channel = NewChannel(false); if (!this->channel) { return false; } this->channel->MakeTemp(); return true; } bool AuRPCServer::RegisterService(const AuSPtr &service) { if (!service) { SysPushErrorArg(); return {}; } AU_LOCK_GUARD(this->lock->AsWritable()); return AuTryInsert(this->serviceTable, service->GetId(), service); } void AuRPCServer::BroadcastMessage(const AuMemoryViewRead &view) { AuList> channels; { AU_LOCK_GUARD(this->lock->AsReadable()); for (const auto &pChannel : this->channel->subchannels_) { if (pChannel) { if (!pChannel->isTempChannel_) { channels.push_back(pChannel); } } } } auto pMessage = AuMakeSharedPanic(); pMessage->PrepareResponse(kGeneralBroadcast); pMessage->buffer->Write(view.ptr, view.length); pMessage->FinalizeWrite(); for (const auto &pChannel : channels) { pChannel->SendResponse(pMessage); } } AuList> AuRPCServer::GetClients() { AuList> ret; if (auto pChannel = this->channel) { for (const auto &pClientChannel : pChannel->subchannels_) { ret.push_back(pClientChannel); } } return ret; } void AuRPCServer::SetCallbacks(const AuSPtr &pCallbacks) { this->pCallbacks = pCallbacks; } AuString AuRPCServer::ExportToString() { return ToPrimaryChannel()->ExportString(); } AuSPtr AuRPCServer::ToPrimaryChannel() { if (this->channel) { return this->channel; } return this->channel = NewChannel(true); } void AuRPCServer::Dispatch(AuRPCServerChannel *channel, const AuSPtr &request, AuByteBuffer *buffer) { AU_LOCK_GUARD(this->lock->AsReadable()); auto pResponse = AuMakeSharedPanic(); pResponse->cookie = request->cookie; pResponse->PrepareResponse(kResponseRPC); auto itr = this->serviceTable.find(request->serviceId); if (itr == this->serviceTable.end()) { pResponse->FinalizeWrite(); channel->SendResponse(pResponse); return; } try { pResponse->PrepareMessage(); AuSharedFuture refFuture; itr->second->Dispatch(pResponse, request->methodId, *buffer, refFuture); if (!refFuture) { pResponse->FinalizeWrite(); channel->SendResponse(pResponse); } else { auto pShared = channel->SharedFromThis(); refFuture->OnComplete([=]() { pResponse->FinalizeWrite(); pShared->SendResponse(pResponse); }); } } catch (...) { SysPushErrorCatch(); channel->FatalError(); } } AuSPtr AuRPCServer::NewChannel(bool bIsPrimary) { auto eh = AuMakeShared(this->parent->SharedFromThis(), AuSPtr(this->parent->SharedFromThis(), this)); if (!eh) { return {}; } if ((this->worker.second != AuAsync::kThreadIdAny) && (this->worker == AuAsync::GetCurrentWorkerPId())) { if (!eh->Init()) { return {}; } } else { DispatchOn(this->worker, [&]() { if (!eh->Init()) { eh.reset(); } })->BlockUntilComplete(); } return eh; }