/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuRPCServerChannel.cpp Date: 2022-6-29 Author: Reece ***/ #include #include "AuRPC.hpp" #include "AuRPCServerChannel.hpp" #include "AuRPCRequest.hpp" #include "AuRPCPipePacket.hpp" AuRPCServerChannel::AuRPCServerChannel(AuSPtr parent, AuSPtr server) : parent(parent), pipe(this), server_(server) { } AuSPtr AuRPCServerChannel::ToContext() { return this->parent; } void AuRPCServerChannel::SendResponse(AuSPtr response) { if (response->message->flagWriteError) { this->FatalError(); return; } AuRPCPipePacket packet; packet.serverResponse = response; this->pipe.SendPacket(packet); } AuString AuRPCServerChannel::ExportString() { return this->pipe.pipe->ExportToString(); } bool AuRPCServerChannel::OnConnect() { RpcLogDebug("Server channel: on connect"); return true; } void AuRPCServerChannel::OnDisconnect(bool error) { RpcLogDebug("Server channel disconnected: {}", error); } bool AuRPCServerChannel::OnDataAvailable(AuByteBuffer& view) { do { auto bytesAvailable = view.RemainingBytes(); if (bytesAvailable < 4) { return true; } auto oldRead = view.readPtr; auto frameLength = view.Read(); if (!frameLength) { FatalError(); return false; } if (frameLength > bytesAvailable) { view.readPtr = oldRead; return true; } auto packetType = view.Read(); if (packetType == kRequestConnect) { if (this->isTempChannel_) { this->SendToNewChannel(); } else { this->SendConnectOK(); } } else if (packetType == kRequestRPC) { auto request = AuMakeShared(); if (!request) { SysPushErrorMem(); view.readPtr = oldRead; return true; } request->serviceId = view.Read(); request->methodId = view.Read(); request->cookie = view.Read(); auto oldLength = view.length; view.length = (oldRead - view.base) + frameLength; this->server_->Dispatch(this, request, &view); view.readPtr = frameLength + oldRead; view.length = oldLength; } } while (true); return true; } void AuRPCServerChannel::SendConnectOK() { RpcLogDebug("AuRPCServerChannel::SendConnectOK"); auto res = AuMakeShared(); if (!res) { FatalError(); return; } res->PrepareResponse(kResponseConnectOK); res->FinalizeWrite(); this->SendResponse(res); } void AuRPCServerChannel::SendToNewChannel() { RpcLogDebug("AuRPCServerChannel::SendToNewChannel"); auto res = AuMakeShared(); if (!res) { FatalError(); return; } res->PrepareResponse(kResponseMulticonnect); auto channel = this->server_->NewChannel(); if (!channel) { this->FatalError(); return; } if (!AuTryInsert(this->subchannels_, channel)) { this->FatalError(); return; } res->buffer->Write(channel->ExportString()); res->FinalizeWrite(); this->SendResponse(res); } void AuRPCServerChannel::FatalError() { RpcLogDebug("AuRPCServerChannel::FatalError"); this->pipe.Deinit(); } bool AuRPCServerChannel::Init() { return this->pipe.Init(); }