225 lines
4.8 KiB
C++
225 lines
4.8 KiB
C++
/***
|
|
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
File: AuRPCClientChannel.cpp
|
|
Date: 2022-6-29
|
|
Author: Reece
|
|
***/
|
|
#include <AuroraRuntime.hpp>
|
|
#include "AuRPC.hpp"
|
|
#include "AuRPCClientChannel.hpp"
|
|
#include "AuRPCRequest.hpp"
|
|
#include "AuRPCPipePacket.hpp"
|
|
|
|
bool AuRPCClientChannel::OnConnect()
|
|
{
|
|
auto request = AuMakeShared<AuRPCRequest>();
|
|
request->WriteHeaderConnect();
|
|
AuRPCPipePacket packet;
|
|
packet.clientRequest = request;
|
|
packet.protPacket = true;
|
|
packet.clientChannel = SharedFromThis();
|
|
this->pipe_.SendPacket(packet);
|
|
return true;
|
|
}
|
|
|
|
void AuRPCClientChannel::Disconnect()
|
|
{
|
|
this->pipe_.Deinit();
|
|
Finalize();
|
|
}
|
|
|
|
void AuRPCClientChannel::SendRequest(AuSPtr<AuIRPCRequest> response2)
|
|
{
|
|
auto response = AuStaticCast<AuRPCRequest>(response2);
|
|
if (!response)
|
|
{
|
|
SysPushErrorArg();
|
|
return;
|
|
}
|
|
|
|
RpcLogDebug("Client sending request of bytes: {}", response->GetData().length);
|
|
|
|
AuRPCPipePacket packet;
|
|
packet.clientRequest = response;
|
|
packet.clientChannel = SharedFromThis();
|
|
this->pipe_.SendPacket(packet);
|
|
}
|
|
|
|
AuSPtr<AuRPC> AuRPCClientChannel::ToContext()
|
|
{
|
|
return this->parent_;
|
|
}
|
|
|
|
void AuRPCClientChannel::OnDisconnect(bool error)
|
|
{
|
|
RpcLogDebug("AuRPCClientChannel::OnDisconnect");
|
|
|
|
if (this->bConnected_ || !this->bConnectingAlternate_)
|
|
{
|
|
Finalize();
|
|
}
|
|
}
|
|
|
|
bool AuRPCClientChannel::OnDataAvailable(AuByteBuffer &view)
|
|
{
|
|
RpcLogDebug("AuRPCClientChannel::OnDataAvailable");
|
|
|
|
do
|
|
{
|
|
auto bytesAvailable = view.RemainingBytes();
|
|
|
|
if (bytesAvailable < 4)
|
|
{
|
|
return true;
|
|
}
|
|
|
|
auto oldRead = view.readPtr;
|
|
auto frameLength = view.Read<AuUInt32>();
|
|
|
|
if (!frameLength)
|
|
{
|
|
Disconnect();
|
|
return false;
|
|
}
|
|
|
|
if (frameLength > bytesAvailable)
|
|
{
|
|
view.readPtr = oldRead;
|
|
return true;
|
|
}
|
|
|
|
auto endPtr = oldRead + frameLength;
|
|
|
|
auto packetType = view.Read<AuUInt8>();
|
|
|
|
if (packetType == kResponseConnectOK)
|
|
{
|
|
this->bConnected_ = true;
|
|
this->ProcessConnectionOK();
|
|
}
|
|
else if (packetType == kResponseMulticonnect)
|
|
{
|
|
this->bConnectingAlternate_ = true;
|
|
this->pipe_.Deinit();
|
|
this->Init(view.Read<AuString>());
|
|
}
|
|
else if (packetType == kResponseRPC)
|
|
{
|
|
auto response = AuMakeShared<AuRPCResponse>();
|
|
if (!response)
|
|
{
|
|
SysPushErrorMem();
|
|
view.readPtr = oldRead;
|
|
return true;
|
|
}
|
|
|
|
auto oldLength = view.length;
|
|
view.length = (oldRead - view.base) + frameLength;
|
|
|
|
response->message = &view;
|
|
|
|
response->Deserialize();
|
|
this->ProcessResponse(response);
|
|
|
|
view.readPtr = endPtr;
|
|
view.length = oldLength;
|
|
}
|
|
|
|
} while (true);
|
|
|
|
return true;
|
|
}
|
|
|
|
bool AuRPCClientChannel::Init(const AuString &ipc)
|
|
{
|
|
return this->pipe_.Init(ipc);
|
|
}
|
|
|
|
void AuRPCClientChannel::FatalIOError()
|
|
{
|
|
RpcLogDebug("AuRPCClientChannel::FatalIOError");
|
|
Disconnect();
|
|
}
|
|
|
|
void AuRPCClientChannel::ProcessConnectionOK()
|
|
{
|
|
auto re = AuExchange(this->outstandingRequests, AuList<AuSPtr<AuRPCRequest>>{});
|
|
|
|
if (this->callbacks_)
|
|
{
|
|
this->callbacks_->OnConnect();
|
|
}
|
|
|
|
for (auto a : re)
|
|
{
|
|
this->SendRequest(a);
|
|
}
|
|
}
|
|
|
|
void AuRPCClientChannel::ProcessResponse(const AuSPtr<AuRPCResponse> &response)
|
|
{
|
|
RpcLogDebug("AuRPCClientChannel::ProcessResponse");
|
|
|
|
for (auto itr = this->outstandingRequests.begin();
|
|
itr != this->outstandingRequests.end();
|
|
itr++)
|
|
{
|
|
if (response->cookie != AuUInt(itr->get()))
|
|
{
|
|
continue;
|
|
}
|
|
|
|
auto req = *itr;
|
|
this->outstandingRequests.erase(itr);
|
|
|
|
if (req->callback)
|
|
{
|
|
req->callback->OnResponse(*response);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
bool AuRPCClientChannel::IsConnected()
|
|
{
|
|
return this->bConnected_;
|
|
}
|
|
|
|
void AuRPCClientChannel::SetCallbacks(const AuSPtr<AuIRPCChannelCallbacks> &callbacks)
|
|
{
|
|
if (!callbacks)
|
|
{
|
|
SysPushErrorArg();
|
|
return;
|
|
}
|
|
|
|
if (IsConnected())
|
|
{
|
|
callbacks->OnConnect();
|
|
}
|
|
|
|
this->callbacks_ = callbacks;
|
|
}
|
|
|
|
void AuRPCClientChannel::Finalize()
|
|
{
|
|
auto re = AuExchange(this->outstandingRequests, AuList<AuSPtr<AuRPCRequest>>{});
|
|
|
|
if (AuExchange(this->bIsDead_, true))
|
|
{
|
|
return;
|
|
}
|
|
|
|
for (auto a : re)
|
|
{
|
|
a->callback->OnResponse(AuRPCResponse(ERPCError::eAborted));
|
|
}
|
|
|
|
if (this->callbacks_)
|
|
{
|
|
this->callbacks_->OnDisconnect();
|
|
this->callbacks_.reset();
|
|
}
|
|
}
|