AuroraRPC/Source/AuRPCClientChannel.cpp

225 lines
4.8 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuRPCClientChannel.cpp
Date: 2022-6-29
Author: Reece
***/
#include <AuroraRuntime.hpp>
#include "AuRPC.hpp"
#include "AuRPCClientChannel.hpp"
#include "AuRPCRequest.hpp"
#include "AuRPCPipePacket.hpp"
bool AuRPCClientChannel::OnConnect()
{
auto request = AuMakeShared<AuRPCRequest>();
request->WriteHeaderConnect();
AuRPCPipePacket packet;
packet.clientRequest = request;
packet.protPacket = true;
packet.clientChannel = SharedFromThis();
this->pipe_.SendPacket(packet);
return true;
}
void AuRPCClientChannel::Disconnect()
{
this->pipe_.Deinit();
Finalize();
}
void AuRPCClientChannel::SendRequest(AuSPtr<AuIRPCRequest> response2)
{
auto response = AuStaticCast<AuRPCRequest>(response2);
if (!response)
{
SysPushErrorArg();
return;
}
RpcLogDebug("Client sending request of bytes: {}", response->GetData().length);
AuRPCPipePacket packet;
packet.clientRequest = response;
packet.clientChannel = SharedFromThis();
this->pipe_.SendPacket(packet);
}
AuSPtr<AuRPC> AuRPCClientChannel::ToContext()
{
return this->parent_;
}
void AuRPCClientChannel::OnDisconnect(bool error)
{
RpcLogDebug("AuRPCClientChannel::OnDisconnect");
if (this->bConnected_ || !this->bConnectingAlternate_)
{
Finalize();
}
}
bool AuRPCClientChannel::OnDataAvailable(AuByteBuffer &view)
{
RpcLogDebug("AuRPCClientChannel::OnDataAvailable");
do
{
auto bytesAvailable = view.RemainingBytes();
if (bytesAvailable < 4)
{
return true;
}
auto oldRead = view.readPtr;
auto frameLength = view.Read<AuUInt32>();
if (!frameLength)
{
Disconnect();
return false;
}
if (frameLength > bytesAvailable)
{
view.readPtr = oldRead;
return true;
}
auto endPtr = oldRead + frameLength;
auto packetType = view.Read<AuUInt8>();
if (packetType == kResponseConnectOK)
{
this->bConnected_ = true;
this->ProcessConnectionOK();
}
else if (packetType == kResponseMulticonnect)
{
this->bConnectingAlternate_ = true;
this->pipe_.Deinit();
this->Init(view.Read<AuString>());
}
else if (packetType == kResponseRPC)
{
auto response = AuMakeShared<AuRPCResponse>();
if (!response)
{
SysPushErrorMem();
view.readPtr = oldRead;
return true;
}
auto oldLength = view.length;
view.length = (oldRead - view.base) + frameLength;
response->message = &view;
response->Deserialize();
this->ProcessResponse(response);
view.readPtr = endPtr;
view.length = oldLength;
}
} while (true);
return true;
}
bool AuRPCClientChannel::Init(const AuString &ipc)
{
return this->pipe_.Init(ipc);
}
void AuRPCClientChannel::FatalIOError()
{
RpcLogDebug("AuRPCClientChannel::FatalIOError");
Disconnect();
}
void AuRPCClientChannel::ProcessConnectionOK()
{
auto re = AuExchange(this->outstandingRequests, AuList<AuSPtr<AuRPCRequest>>{});
if (this->callbacks_)
{
this->callbacks_->OnConnect();
}
for (auto a : re)
{
this->SendRequest(a);
}
}
void AuRPCClientChannel::ProcessResponse(const AuSPtr<AuRPCResponse> &response)
{
RpcLogDebug("AuRPCClientChannel::ProcessResponse");
for (auto itr = this->outstandingRequests.begin();
itr != this->outstandingRequests.end();
)
{
if (response->cookie != AuUInt(itr->get()))
{
itr++;
continue;
}
auto req = *itr;
this->outstandingRequests.erase(itr);
if (req->callback)
{
req->callback->OnResponse(*response);
}
}
}
bool AuRPCClientChannel::IsConnected()
{
return this->bConnected_;
}
void AuRPCClientChannel::SetCallbacks(const AuSPtr<AuIRPCChannelCallbacks> &callbacks)
{
if (!callbacks)
{
SysPushErrorArg();
return;
}
if (IsConnected())
{
callbacks->OnConnect();
}
this->callbacks_ = callbacks;
}
void AuRPCClientChannel::Finalize()
{
auto re = AuExchange(this->outstandingRequests, AuList<AuSPtr<AuRPCRequest>>{});
if (AuExchange(this->bIsDead_, true))
{
return;
}
for (auto a : re)
{
a->callback->OnResponse(AuRPCResponse(ERPCError::eAborted));
}
if (this->callbacks_)
{
this->callbacks_->OnDisconnect();
this->callbacks_.reset();
}
}