AuroraRPC/Source/AuRPCServerChannel.cpp

335 lines
7.9 KiB
C++
Raw Permalink Normal View History

2022-07-02 22:08:52 +00:00
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuRPCServerChannel.cpp
Date: 2022-6-29
Author: Reece
***/
#include <AuroraRuntime.hpp>
#include "AuRPC.hpp"
#include "AuRPCServerChannel.hpp"
#include "AuRPCRequest.hpp"
#include "AuRPCPipePacket.hpp"
AuRPCServerChannel::AuRPCServerChannel(AuSPtr<AuRPC> parent, AuSPtr<AuRPCServer> server) :
parent(parent),
pipe(this),
server_(server)
{
this->uConnectTime_ = AuTime::SteadyClockNS();
}
2022-07-02 22:08:52 +00:00
AuRPCServerChannel::~AuRPCServerChannel()
{
this->RemoveFromParent();
2022-07-02 22:08:52 +00:00
}
AuSPtr<AuRPC> AuRPCServerChannel::ToContext()
{
return this->parent;
}
void AuRPCServerChannel::SendResponse(AuSPtr<AuRPCResponse> response)
{
if (response->message->flagWriteError)
{
this->FatalError();
return;
}
AuRPCPipePacket packet;
packet.serverResponse = response;
this->pipe.SendPacket(packet);
}
AuString AuRPCServerChannel::ExportString()
{
return this->pipe.pipe->ExportToString();
}
bool AuRPCServerChannel::OnConnect()
{
RpcLogDebug("Server channel: on connect");
return true;
}
void AuRPCServerChannel::OnDisconnect(bool error)
{
RpcLogDebug("Server channel disconnected: {}", error);
this->RemoveFromParent();
}
void AuRPCServerChannel::RemoveFromParent()
{
2023-12-16 19:56:10 +00:00
if (auto pCallbacks = this->server_->pCallbacks)
{
if (!this->isTempChannel_)
{
pCallbacks->OnDisconnect(this->SharedFromThis());
}
}
if (auto pParent = this->server_)
{
AU_LOCK_GUARD(pParent->lock->AsWritable());
if (auto pChannel = pParent->channel)
{
AuRemoveIf(pChannel->subchannels_,
[=](AuSPtr<AuRPCServerChannel> pChannel) -> bool
{
return pChannel.get() == this;
});
}
}
2022-07-02 22:08:52 +00:00
}
bool AuRPCServerChannel::OnDataAvailable(AuByteBuffer& view)
{
do
{
auto bytesAvailable = view.RemainingBytes();
if (bytesAvailable < 4)
{
return true;
}
auto oldRead = view.readPtr;
auto frameLength = view.Read<AuUInt32>();
if (!frameLength)
{
FatalError();
return false;
}
2022-07-02 22:08:52 +00:00
if (frameLength > bytesAvailable)
2022-07-02 22:08:52 +00:00
{
view.readPtr = oldRead;
return true;
}
auto packetType = view.Read<AuUInt8>();
if (packetType == kRequestConnect)
{
if (this->isTempChannel_)
{
this->SendToNewChannel();
}
else
{
this->SendConnectOK();
}
}
else if (packetType == kRequestRPC)
{
auto request = AuMakeShared<AuRPCRequest>();
if (!request)
{
SysPushErrorMem();
view.readPtr = oldRead;
return true;
}
request->serviceId = view.Read<AuUInt32>();
request->methodId = view.Read<AuUInt32>();
request->cookie = view.Read<AuUInt64>();
auto oldLength = view.length;
auto oldWriteHead = view.writePtr;
view.length = (oldRead - view.base) + frameLength;
view.writePtr = view.base + view.length;
2022-07-02 22:08:52 +00:00
this->server_->Dispatch(this, request, &view);
view.readPtr = oldRead + frameLength;
view.writePtr = oldWriteHead;
view.length = oldLength;
}
else if (packetType == kGeneralServerMessage)
{
auto oldLength = view.length;
auto oldWriteHead = view.writePtr;
view.length = (oldRead - view.base) + frameLength;
view.writePtr = view.base + view.length;
if (auto pCallbacks = this->server_->pCallbacks)
{
pCallbacks->OnMessage(this->SharedFromThis(),
view);
}
view.readPtr = oldRead + frameLength;
view.writePtr = oldWriteHead;
view.length = oldLength;
2022-07-02 22:08:52 +00:00
}
2023-12-16 22:15:23 +00:00
else if (packetType == kGeneralMassiveMessage)
{
auto oldLength = view.length;
auto oldWriteHead = view.writePtr;
view.length = (oldRead - view.base) + frameLength;
view.writePtr = view.base + view.length;
auto str = view.Read<AuString>();
if (view)
{
auto pMemory = AuIPC::ImportSharedMemory(str);
if (!pMemory)
{
SysPushErrorIO("Couldnt open shared memory for large packet");
this->FatalError();
return true;
}
AuByteBuffer buf(pMemory->GetMemory());
if (!buf)
{
SysPushErrorMemory();
this->FatalError();
return true;
}
if (auto pCallbacks = this->server_->pCallbacks)
{
pCallbacks->OnMessage(this->SharedFromThis(),
buf);
}
}
view.readPtr = oldRead + frameLength;
view.writePtr = oldWriteHead;
view.length = oldLength;
}
2022-07-02 22:08:52 +00:00
}
while (true);
return true;
}
AuUInt64 AuRPCServerChannel::GetConnectTimeNS()
{
return this->uConnectTime_;
}
2023-12-16 22:15:23 +00:00
bool AuRPCServerChannel::SendMessage(const AuMemoryViewRead &view, bool bLarge)
{
2023-12-16 22:15:23 +00:00
if (!bLarge &&
view.length < this->parent->GetLargePacketLength())
{
auto pMessage = AuMakeShared<AuRPCResponseOwned>();
if (!pMessage)
{
return false;
}
pMessage->PrepareResponse(kGeneralClientMessage);
pMessage->buffer->Write(view.ptr, view.length);
pMessage->FinalizeWrite();
if (pMessage->message->flagWriteError)
{
return false;
}
this->SendResponse(pMessage);
return true;
}
else
{
auto pSharedMemory = AuIPC::NewSharedMemory(view.length);
if (!pSharedMemory)
{
return false;
}
auto dest = pSharedMemory->GetMemory();
AuMemcpy(dest.ptr, view.ptr, view.length);
auto pMessage = AuMakeShared<AuRPCResponseOwned>();
if (!pMessage)
{
return false;
}
pMessage->PrepareResponse(kGeneralMassiveMessage);
pMessage->buffer->Write(AuString(pSharedMemory->ExportToString()));
pMessage->FinalizeWrite();
if (pMessage->message->flagWriteError)
{
return false;
}
this->SendResponse(pMessage);
return true;
}
}
2022-07-02 22:08:52 +00:00
void AuRPCServerChannel::SendConnectOK()
{
RpcLogDebug("AuRPCServerChannel::SendConnectOK");
auto res = AuMakeShared<AuRPCResponseOwned>();
if (!res)
{
FatalError();
return;
}
res->PrepareResponse(kResponseConnectOK);
res->FinalizeWrite();
this->SendResponse(res);
2023-12-16 19:56:10 +00:00
if (auto pCallbacks = this->server_->pCallbacks)
{
pCallbacks->OnConnect(this->SharedFromThis());
}
2022-07-02 22:08:52 +00:00
}
void AuRPCServerChannel::SendToNewChannel()
{
RpcLogDebug("AuRPCServerChannel::SendToNewChannel");
auto res = AuMakeShared<AuRPCResponseOwned>();
if (!res)
{
FatalError();
return;
}
res->PrepareResponse(kResponseMulticonnect);
auto channel = this->server_->NewChannel(false);
2022-07-02 22:08:52 +00:00
if (!channel)
{
this->FatalError();
return;
}
if (!AuTryInsert(this->subchannels_, channel))
{
this->FatalError();
return;
}
res->buffer->Write(channel->ExportString());
res->FinalizeWrite();
this->SendResponse(res);
}
void AuRPCServerChannel::FatalError()
{
RpcLogDebug("AuRPCServerChannel::FatalError");
this->pipe.Deinit();
}
bool AuRPCServerChannel::Init()
{
return this->pipe.Init(this->parent->optPipeLength);
2022-07-02 22:08:52 +00:00
}