[+] Broadcasts and client/server messages

[+] Async responses
[+] AuRPC::SetRecommendedPipeLength
This commit is contained in:
Reece Wilson 2023-12-16 18:16:32 +00:00
parent eb042e5a29
commit aa7591967a
13 changed files with 398 additions and 132 deletions

View File

@ -8,6 +8,7 @@
#pragma once
struct AuRPCRequest;
struct AuIRPCSession;
AUE_DEFINE(ERPCRequestState, (
ePending,
@ -31,34 +32,56 @@ AUE_DEFINE(ERPCError, (
struct AuRPCResponse
{
virtual ~AuRPCResponse()
inline virtual ~AuRPCResponse()
{
}
AuRPCResponse() : error(ERPCError::eNone)
inline AuRPCResponse() : error(ERPCError::eNone)
{
}
AuRPCResponse(ERPCError error) : error(error)
inline AuRPCResponse(ERPCError error) : error(error)
{
}
inline AuByteBuffer *GetBuffer()
{
return this->message;
}
inline ERPCError GetError()
{
return this->error;
}
inline void SetError(ERPCError error)
{
this->error = error;
}
private:
friend struct AuRPCResponseOwned;
friend struct AuRPCServerChannel;
friend struct AuRPCClientChannel;
friend struct AuRPCPipe;
friend struct AuRPCServer;
ERPCError error;
AuUInt64 cookie {};
AuByteBuffer *message {};
void Deserialize()
inline void Deserialize()
{
this->error = AuStaticCast<ERPCError>(message->Read<AuUInt8>());
this->cookie = message->Read<AuUInt64>();
}
void PrepareMessage()
inline void PrepareMessage()
{
message->Write<AuUInt8>(0);
message->Write<AuUInt64>(this->cookie);
}
void WriteError()
inline void WriteError()
{
message->Write<AuUInt8>(AuStaticCast<AuUInt8>(this->error));
message->Write<AuUInt64>(this->cookie);
@ -69,7 +92,7 @@ struct AuRPCResponseOwned : AuRPCResponse
{
AuSPtr<AuByteBuffer> buffer;
void PrepareResponse(AuUInt8 type)
inline void PrepareResponse(AuUInt8 type)
{
buffer = AuMakeShared<AuByteBuffer>();
SysAssert(buffer);
@ -78,7 +101,7 @@ struct AuRPCResponseOwned : AuRPCResponse
buffer->Write(type);
}
void FinalizeWrite()
inline void FinalizeWrite()
{
auto length = buffer->writePtr - buffer->base;
buffer->writePtr = buffer->base;
@ -93,7 +116,14 @@ AUI_INTERFACE(AuRPCRequestCallback,
AUI_INTERFACE(AuIRPCChannelCallbacks,
AUI_METHOD(void, OnConnect, ()),
AUI_METHOD(void, OnDisconnect, ())
AUI_METHOD(void, OnDisconnect, ()),
AUI_METHOD(void, OnBroadcast, (AuByteBuffer &, refReadOnly)),
AUI_METHOD(void, OnMessage, (AuByteBuffer &, refReadOnly))
);
AUI_INTERFACE(AuIRPCServerCallbacks,
AUI_METHOD(void, OnMessage, (AuSPtr<AuIRPCSession>, pClient,
AuByteBuffer &, refReadOnly))
);
struct AuIRPCRequest
@ -102,10 +132,17 @@ struct AuIRPCRequest
virtual bool SetData(const AuMemoryViewRead &view) = 0;
virtual bool EmptyRequest() = 0;
virtual void SetCallback(AuSPtr<AuRPCRequestCallback> callback) = 0;
virtual void SetCallback(const AuSPtr<AuRPCRequestCallback> &callback) = 0;
};
struct AuIRPCClientChannel
struct AuIRPCSession
{
virtual AuUInt64 GetConnectTimeNS() = 0;
virtual void SendMessage(const AuMemoryViewRead &view) = 0;
};
struct AuIRPCClientChannel :
AuIRPCSession
{
virtual void Disconnect() = 0;
virtual void SendRequest(AuSPtr<AuIRPCRequest> response) = 0;
@ -116,14 +153,19 @@ struct AuIRPCClientChannel
struct AuIRPCService
{
virtual AuUInt32 GetId() = 0;
virtual void Dispatch(AuRPCResponse &response, AuUInt32 id, AuByteBuffer& buffer) = 0;
virtual void Dispatch(const AuSPtr<AuRPCResponse> &pResponse,
AuUInt32 id,
AuByteBuffer &buffer,
AuSharedFuture<void> &refFuture) = 0;
};
struct AuIRPCServer
struct AuIRPCServer :
AuIPC::IExportableIPC
{
virtual bool RegisterService(const AuSPtr<AuIRPCService> service) = 0;
virtual AuString ExportString() = 0;
virtual bool RegisterService(const AuSPtr<AuIRPCService> &service) = 0;
virtual void BroadcastMessage(const AuMemoryViewRead &view) = 0;
virtual AuList<AuSPtr<AuIRPCSession>> GetClients() = 0;
virtual void SetCallbacks(const AuSPtr<AuIRPCServerCallbacks> &pCallbacks) = 0;
};
struct AuIRPC
@ -133,6 +175,8 @@ struct AuIRPC
virtual AuSPtr<AuIRPCServer> ToServer() = 0;
virtual AuSPtr<AuIRPCClientChannel> Connect(const AuString& str) = 0;
virtual void SetRecommendedPipeLength(AuUInt32 uLength) = 0;
};
AuSPtr<AuIRPCRequest> AuRPCNewRequest(AuUInt32 serviceId, AuUInt32 methodId);

View File

@ -60,6 +60,11 @@ AuSPtr<AuIRPCClientChannel> AuRPC::Connect(const AuString& str)
return eh;
}
void AuRPC::SetRecommendedPipeLength(AuUInt32 uLength)
{
this->optPipeLength = uLength;
}
AuSPtr<AuIRPC> AuRPCNewInstance()
{
return AuMakeShared<AuRPC>();

View File

@ -23,15 +23,18 @@ struct AuIRPCService;
struct AuRPC : AuIRPC, AuEnableSharedFromThis<AuRPC>
{
bool StartClient(AuAsync::WorkerPId_t worker) override;
bool StartServer(AuAsync::WorkerPId_t worker) override;
bool StartClient(AuAsync::WorkerPId_t worker);
bool StartServer(AuAsync::WorkerPId_t worker);
AuSPtr<AuIRPCServer> ToServer();
AuSPtr<AuIRPCClientChannel> Connect(const AuString& str);
AuSPtr<AuIRPCServer> ToServer() override;
AuSPtr<AuIRPCClientChannel> Connect(const AuString &str);
void SetRecommendedPipeLength(AuUInt32 uLength) override;
private:
friend struct AuRPCPipe;
friend struct AuRPCServerChannel;
AuOptional<AuUInt32> optPipeLength;
AuAsync::WorkerPId_t pinnedClientThread;
AuRPCServer server;
AuList<AuSPtr<AuRPCClientChannel>> clientChannels;
@ -39,12 +42,16 @@ private:
AuSPtr<AuIO::IIOProcessor> GetRPCProcessor();
static const auto kRequestConnect = 1;
static const auto kRequestRPC = 2;
static const auto kRequestConnect = 1u;
static const auto kRequestRPC = 2u;
static const auto kResponseConnectOK = 10;
static const auto kResponseMulticonnect = 11;
static const auto kResponseRPC = 12;
static const auto kResponseConnectOK = 10u;
static const auto kResponseMulticonnect = 11u;
static const auto kResponseRPC = 12u;
static const auto kGeneralBroadcast = 20u;
static const auto kGeneralClientMessage = 21u; // shared
static const auto kGeneralServerMessage = 21u; // shared
static const AuUInt8 kResponseError[512] = {0};

View File

@ -11,6 +11,11 @@
#include "AuRPCRequest.hpp"
#include "AuRPCPipePacket.hpp"
AuRPCClientChannel::AuRPCClientChannel(AuSPtr<AuRPC> parent) :
parent_(parent),
pipe_(this)
{ }
bool AuRPCClientChannel::OnConnect()
{
auto request = AuMakeShared<AuRPCRequest>();
@ -46,6 +51,19 @@ void AuRPCClientChannel::SendRequest(AuSPtr<AuIRPCRequest> response2)
this->pipe_.SendPacket(packet);
}
AuUInt64 AuRPCClientChannel::GetConnectTimeNS()
{
return this->uConnectTime_;
}
void AuRPCClientChannel::SendMessage(const AuMemoryViewRead &view)
{
auto pMessage = AuMakeSharedPanic<AuRPCRequest>();
pMessage->dataType = kGeneralServerMessage;
pMessage->SetData(view);
this->SendRequest(pMessage);
}
AuSPtr<AuRPC> AuRPCClientChannel::ToContext()
{
return this->parent_;
@ -115,14 +133,40 @@ bool AuRPCClientChannel::OnDataAvailable(AuByteBuffer &view)
}
auto oldLength = view.length;
auto oldWriteHead = view.writePtr;
view.length = (oldRead - view.base) + frameLength;
view.writePtr = view.base + view.length;
response->message = &view;
response->Deserialize();
this->ProcessResponse(response);
view.readPtr = endPtr;
view.writePtr = oldWriteHead;
view.length = oldLength;
}
else if (packetType == kGeneralBroadcast ||
packetType == kGeneralClientMessage)
{
auto oldLength = view.length;
auto oldWriteHead = view.writePtr;
view.length = (oldRead - view.base) + frameLength;
view.writePtr = view.base + view.length;
if (auto pCallbacks = this->callbacks_)
{
if (packetType == kGeneralBroadcast)
{
pCallbacks->OnBroadcast(view);
}
else
{
pCallbacks->OnBroadcast(view);
}
}
view.readPtr = endPtr;
view.writePtr = oldWriteHead;
view.length = oldLength;
}
@ -144,6 +188,8 @@ void AuRPCClientChannel::FatalIOError()
void AuRPCClientChannel::ProcessConnectionOK()
{
this->uConnectTime_ = AuTime::SteadyClockNS();
auto re = AuExchange(this->outstandingRequests, AuList<AuSPtr<AuRPCRequest>>{});
if (this->callbacks_)

View File

@ -10,11 +10,12 @@
#include "AuRPCChannel.hpp"
#include "AuRPCPipe.hpp"
struct AuRPCClientChannel : AuRPCChannel, AuIRPCClientChannel, AuEnableSharedFromThis<AuRPCClientChannel>
struct AuRPCClientChannel :
AuRPCChannel,
AuIRPCClientChannel,
AuEnableSharedFromThis<AuRPCClientChannel>
{
AuRPCClientChannel(AuSPtr<AuRPC> parent) : parent_(parent), pipe_(this)
{
}
AuRPCClientChannel(AuSPtr<AuRPC> parent);
bool Init(const AuString& ipc);
void Finalize();
@ -24,6 +25,9 @@ struct AuRPCClientChannel : AuRPCChannel, AuIRPCClientChannel, AuEnableSharedFro
void FatalIOError();
virtual AuUInt64 GetConnectTimeNS() override;
virtual void SendMessage(const AuMemoryViewRead &view) override;
virtual AuSPtr<AuRPC> ToContext() override;
virtual bool OnConnect() override;
virtual void OnDisconnect(bool error) override;
@ -37,12 +41,12 @@ struct AuRPCClientChannel : AuRPCChannel, AuIRPCClientChannel, AuEnableSharedFro
virtual bool IsConnected() override;
virtual void SetCallbacks(const AuSPtr<AuIRPCChannelCallbacks> &callbacks) override;
private:
bool bConnectingAlternate_{};
bool bConnected_{};
bool bIsDead_ {};
AuSPtr<AuIRPCChannelCallbacks> callbacks_;
AuRPCPipe pipe_;
AuSPtr<AuRPC> parent_;
AuSPtr<AuRPC> parent_;
AuUInt64 uConnectTime_ {};
};

View File

@ -13,82 +13,83 @@
#include "AuRPCChannel.hpp"
#include "AuRPCClientChannel.hpp"
bool AuRPCPipe::SendPacket(const AuRPCPipePacket &packet)
struct RPCTransaction :
AuIO::IAsyncFinishedSubscriber,
AuEnableSharedFromThis<RPCTransaction>
{
AuSPtr<AuIO::IAsyncTransaction> transaction;
AuMemoryViewRead view;
AuRPCPipePacket packet;
struct RPCTransaction : AuIO::IAsyncFinishedSubscriber, AuEnableSharedFromThis<RPCTransaction>
void OnAsyncFileOpFinished(AuUInt64 offset, AuUInt32 length) override
{
AuSPtr<AuIO::IAsyncTransaction> transaction;
AuMemoryViewRead view;
AuRPCPipePacket packet;
void OnAsyncFileOpFinished(AuUInt64 offset, AuUInt32 length) override
transaction->SetCallback({});
if (length != view.length)
{
transaction->SetCallback({});
if (length != view.length)
this->Fail();
if (this->packet.clientChannel)
{
this->Fail();
if (this->packet.clientChannel)
{
this->packet.clientChannel->FatalIOError();
}
}
else
{
this->Sent();
this->packet.clientChannel->FatalIOError();
}
}
bool SendClient()
else
{
if (packet.protPacket)
{
return true;
}
if (!packet.clientRequest)
{
return false;
}
packet.clientRequest->state = ERPCRequestState::eSubmitting;
if (!AuTryInsert(packet.clientChannel->outstandingRequests, packet.clientRequest))
{
return false;
}
this->Sent();
}
}
bool SendClient()
{
if (packet.protPacket)
{
return true;
}
void Fail()
if (!packet.clientRequest)
{
if (packet.clientChannel)
{
packet.clientRequest->callback->OnResponse(AuRPCResponse(ERPCError::eIOError));
packet.clientRequest->state = ERPCRequestState::eFailed;
}
this->Remove();
return false;
}
void Remove()
packet.clientRequest->state = ERPCRequestState::eSubmitting;
if (!AuTryInsert(packet.clientChannel->outstandingRequests, packet.clientRequest))
{
if (packet.clientChannel)
{
AuTryRemove(packet.clientChannel->outstandingRequests, packet.clientRequest);
}
return false;
}
void Sent()
{
if (packet.clientRequest)
{
packet.clientRequest->state = ERPCRequestState::eSent;
}
}
};
return true;
}
void Fail()
{
if (packet.clientChannel)
{
packet.clientRequest->callback->OnResponse(AuRPCResponse(ERPCError::eIOError));
packet.clientRequest->state = ERPCRequestState::eFailed;
}
this->Remove();
}
void Remove()
{
if (packet.clientChannel)
{
AuTryRemove(packet.clientChannel->outstandingRequests, packet.clientRequest);
}
}
void Sent()
{
if (packet.clientRequest)
{
packet.clientRequest->state = ERPCRequestState::eSent;
}
}
};
bool AuRPCPipe::SendPacket(const AuRPCPipePacket &packet)
{
auto ioTransaction = this->pipe->NewAsyncTransaction();
if (!ioTransaction)
{
@ -107,17 +108,41 @@ bool AuRPCPipe::SendPacket(const AuRPCPipePacket &packet)
transactionObject->transaction->SetCallback(transactionObject);
transactionObject->SendClient();
if (!transactionObject->transaction->StartWrite(0, AuSPtr<AuMemoryViewRead>(transactionObject->SharedFromThis(), &transactionObject->view)))
AuWorkerPId_t pid;
if (packet.clientChannel)
{
transactionObject->Fail();
pid = this->channel->ToContext()->pinnedClientThread;
}
else
{
pid = this->channel->ToContext()->server.worker;
}
if (pid != AuAsync::GetCurrentWorkerPId())
{
AuAsync::NewWorkFunction(pid, [=]()
{
transactionObject->SendClient();
if (!transactionObject->transaction->StartWrite(0, AuSPtr<AuMemoryViewRead>(transactionObject->SharedFromThis(), &transactionObject->view)))
{
transactionObject->Fail();
}
})->Dispatch();
}
else
{
transactionObject->SendClient();
if (!transactionObject->transaction->StartWrite(0, AuSPtr<AuMemoryViewRead>(transactionObject->SharedFromThis(), &transactionObject->view)))
{
transactionObject->Fail();
}
}
return true;
}
AuRPCPipe::~AuRPCPipe()
{
Deinit();
@ -247,9 +272,12 @@ bool AuRPCPipe::Init(const AuString& str)
return true;// WaitForOtherEnd();
}
bool AuRPCPipe::Init()
bool AuRPCPipe::Init(AuOptional<AuUInt32> optLength)
{
this->pipe = AuIPC::NewPipe();
this->pipe = optLength ?
AuIPC::NewPipeEx(optLength.value()) :
AuIPC::NewPipe();
if (!this->pipe)
{
SysPushErrorIO("Couldn't spare the resources required for an IPC pipe");

View File

@ -41,7 +41,7 @@ struct AuRPCPipe : AuIO::IIOBufferedStreamAvailable, AuIO::IIOSimpleEventListene
bool SendPacket(const AuRPCPipePacket& packet);
bool Init(const AuString& str);
bool Init();
bool Init(AuOptional<AuUInt32> optLength);
bool WaitForOtherEnd();
void Deinit();

View File

@ -30,7 +30,7 @@ bool AuRPCRequest::SetData(const AuMemoryViewRead& view)
return bool(data);
}
void AuRPCRequest::SetCallback(AuSPtr<AuRPCRequestCallback> callback)
void AuRPCRequest::SetCallback(const AuSPtr<AuRPCRequestCallback> &callback)
{
this->callback = callback;
}
@ -59,7 +59,7 @@ void AuRPCRequest::WriteHeaderConnect()
void AuRPCRequest::WriteRPCHeader()
{
this->data.Write<AuUInt32>(this->packetLength);
this->data.Write<AuUInt8>(kRequestRPC);
this->data.Write<AuUInt8>(this->dataType);
this->data.Write<AuUInt32>(this->serviceId);
this->data.Write<AuUInt32>(this->methodId);
this->data.Write<AuUInt64>(AuUInt64(AuUInt(this)));

View File

@ -13,12 +13,13 @@ struct AuRPCRequest : AuIRPCRequest
ERPCRequestState state{};
AuUInt32 methodId{};
AuSPtr<AuRPCRequestCallback> callback;
int dataType { kRequestRPC };
AuUInt64 cookie {};
bool SetData(const AuByteBuffer& toRead) override;
bool SetData(const AuMemoryViewRead& view) override;
void SetCallback(AuSPtr<AuRPCRequestCallback> callback) override;
void SetCallback(const AuSPtr<AuRPCRequestCallback> &callback) override;
bool EmptyRequest() override;

View File

@ -19,16 +19,9 @@ bool AuRPCServer::Init(AuRPC *parent, AuAsync::WorkerPId_t worker)
return {};
}
this->lock = AuThreadPrimitives::RWLockUnique();
if (!this->lock)
{
SysPushErrorMem();
return {};
}
this->worker = worker;
this->parent = parent;
this->channel = NewChannel();
this->channel = NewChannel(false);
if (!this->channel)
{
@ -39,7 +32,7 @@ bool AuRPCServer::Init(AuRPC *parent, AuAsync::WorkerPId_t worker)
return true;
}
bool AuRPCServer::RegisterService(const AuSPtr<AuIRPCService> service)
bool AuRPCServer::RegisterService(const AuSPtr<AuIRPCService> &service)
{
if (!service)
{
@ -51,7 +44,57 @@ bool AuRPCServer::RegisterService(const AuSPtr<AuIRPCService> service)
return AuTryInsert(this->serviceTable, service->GetId(), service);
}
AuString AuRPCServer::ExportString()
void AuRPCServer::BroadcastMessage(const AuMemoryViewRead &view)
{
AuList<AuSPtr<AuRPCServerChannel>> channels;
{
AU_LOCK_GUARD(this->lock->AsReadable());
for (const auto &pChannel : this->channel->subchannels_)
{
if (pChannel)
{
if (!pChannel->isTempChannel_)
{
channels.push_back(pChannel);
}
}
}
}
auto pMessage = AuMakeSharedPanic<AuRPCResponseOwned>();
pMessage->PrepareResponse(kGeneralBroadcast);
pMessage->buffer->Write(view.ptr, view.length);
pMessage->FinalizeWrite();
for (const auto &pChannel : channels)
{
pChannel->SendResponse(pMessage);
}
}
AuList<AuSPtr<AuIRPCSession>> AuRPCServer::GetClients()
{
AuList<AuSPtr<AuIRPCSession>> ret;
if (auto pChannel = this->channel)
{
for (const auto &pClientChannel : pChannel->subchannels_)
{
ret.push_back(pClientChannel);
}
}
return ret;
}
void AuRPCServer::SetCallbacks(const AuSPtr<AuIRPCServerCallbacks> &pCallbacks)
{
this->pCallbacks = pCallbacks;
}
AuString AuRPCServer::ExportToString()
{
return ToPrimaryChannel()->ExportString();
}
@ -63,7 +106,7 @@ AuSPtr<AuRPCServerChannel> AuRPCServer::ToPrimaryChannel()
return this->channel;
}
return this->channel = NewChannel();
return this->channel = NewChannel(true);
}
void AuRPCServer::Dispatch(AuRPCServerChannel *channel,
@ -72,28 +115,40 @@ void AuRPCServer::Dispatch(AuRPCServerChannel *channel,
{
AU_LOCK_GUARD(this->lock->AsReadable());
auto response = AuMakeShared<AuRPCResponseOwned>();
SysAssert(response);
auto pResponse = AuMakeSharedPanic<AuRPCResponseOwned>();
response->cookie = request->cookie;
response->PrepareResponse(kResponseRPC);
pResponse->cookie = request->cookie;
pResponse->PrepareResponse(kResponseRPC);
auto itr = this->serviceTable.find(request->serviceId);
if (itr == this->serviceTable.end())
{
response->FinalizeWrite();
channel->SendResponse(response);
pResponse->FinalizeWrite();
channel->SendResponse(pResponse);
return;
}
try
{
response->PrepareMessage();
pResponse->PrepareMessage();
itr->second->Dispatch(*response, request->methodId, *buffer);
AuSharedFuture<void> refFuture;
itr->second->Dispatch(pResponse, request->methodId, *buffer, refFuture);
response->FinalizeWrite();
channel->SendResponse(response);
if (!refFuture)
{
pResponse->FinalizeWrite();
channel->SendResponse(pResponse);
}
else
{
auto pShared = channel->SharedFromThis();
refFuture->OnComplete([=]()
{
pResponse->FinalizeWrite();
pShared->SendResponse(pResponse);
});
}
}
catch (...)
{
@ -102,7 +157,7 @@ void AuRPCServer::Dispatch(AuRPCServerChannel *channel,
}
}
AuSPtr<AuRPCServerChannel> AuRPCServer::NewChannel()
AuSPtr<AuRPCServerChannel> AuRPCServer::NewChannel(bool bIsPrimary)
{
auto eh = AuMakeShared<AuRPCServerChannel>(this->parent->SharedFromThis(), AuSPtr<AuRPCServer>(this->parent->SharedFromThis(), this));
if (!eh)

View File

@ -13,21 +13,28 @@ struct AuRPCServerChannel;
struct AuRPCServer : AuIRPCServer
{
bool Init(AuRPC* parent, AuAsync::WorkerPId_t worker);
bool RegisterService(const AuSPtr<AuIRPCService> service) override;
bool RegisterService(const AuSPtr<AuIRPCService> &service) override;
void BroadcastMessage(const AuMemoryViewRead &view) override;
AuList<AuSPtr<AuIRPCSession>> GetClients() override;
void SetCallbacks(const AuSPtr<AuIRPCServerCallbacks> &pCallbacks) override;
AuString ExportString() override;
AuString ExportToString() override;
AuSPtr<AuRPCServerChannel> ToPrimaryChannel();
AuSPtr<AuRPCServerChannel> NewChannel();
AuSPtr<AuRPCServerChannel> NewChannel(bool bIsPrimary);
void Dispatch(AuRPCServerChannel *channel,
const AuSPtr<AuRPCRequest> &request,
AuByteBuffer *buffer);
private:
friend struct AuRPCServerChannel;
friend struct AuRPCPipe;
AuRPC* parent;
AuSPtr<AuRPCServerChannel> channel;
AuAsync::WorkerPId_t worker;
AuThreadPrimitives::RWLockUnique_t lock;
AuRWLock lock;
AuHashMap<AuUInt32, AuSPtr<AuIRPCService>> serviceTable;
AuSPtr<AuIRPCServerCallbacks> pCallbacks;
};

View File

@ -16,7 +16,12 @@ AuRPCServerChannel::AuRPCServerChannel(AuSPtr<AuRPC> parent, AuSPtr<AuRPCServer>
pipe(this),
server_(server)
{
this->uConnectTime_ = AuTime::SteadyClockNS();
}
AuRPCServerChannel::~AuRPCServerChannel()
{
this->RemoveFromParent();
}
AuSPtr<AuRPC> AuRPCServerChannel::ToContext()
@ -51,6 +56,24 @@ bool AuRPCServerChannel::OnConnect()
void AuRPCServerChannel::OnDisconnect(bool error)
{
RpcLogDebug("Server channel disconnected: {}", error);
this->RemoveFromParent();
}
void AuRPCServerChannel::RemoveFromParent()
{
if (auto pParent = this->server_)
{
AU_LOCK_GUARD(pParent->lock->AsWritable());
if (auto pChannel = pParent->channel)
{
AuRemoveIf(pChannel->subchannels_,
[=](AuSPtr<AuRPCServerChannel> pChannel) -> bool
{
return pChannel.get() == this;
});
}
}
}
bool AuRPCServerChannel::OnDataAvailable(AuByteBuffer& view)
@ -107,11 +130,31 @@ bool AuRPCServerChannel::OnDataAvailable(AuByteBuffer& view)
request->cookie = view.Read<AuUInt64>();
auto oldLength = view.length;
auto oldWriteHead = view.writePtr;
view.length = (oldRead - view.base) + frameLength;
view.writePtr = view.base + view.length;
this->server_->Dispatch(this, request, &view);
view.readPtr = frameLength + oldRead;
view.readPtr = oldRead + frameLength;
view.writePtr = oldWriteHead;
view.length = oldLength;
}
else if (packetType == kGeneralServerMessage)
{
auto oldLength = view.length;
auto oldWriteHead = view.writePtr;
view.length = (oldRead - view.base) + frameLength;
view.writePtr = view.base + view.length;
if (auto pCallbacks = this->server_->pCallbacks)
{
pCallbacks->OnMessage(this->SharedFromThis(),
view);
}
view.readPtr = oldRead + frameLength;
view.writePtr = oldWriteHead;
view.length = oldLength;
}
@ -121,6 +164,20 @@ bool AuRPCServerChannel::OnDataAvailable(AuByteBuffer& view)
return true;
}
AuUInt64 AuRPCServerChannel::GetConnectTimeNS()
{
return this->uConnectTime_;
}
void AuRPCServerChannel::SendMessage(const AuMemoryViewRead &view)
{
auto pMessage = AuMakeSharedPanic<AuRPCResponseOwned>();
pMessage->PrepareResponse(kGeneralClientMessage);
pMessage->buffer->Write(view.ptr, view.length);
pMessage->FinalizeWrite();
this->SendResponse(pMessage);
}
void AuRPCServerChannel::SendConnectOK()
{
RpcLogDebug("AuRPCServerChannel::SendConnectOK");
@ -150,7 +207,7 @@ void AuRPCServerChannel::SendToNewChannel()
}
res->PrepareResponse(kResponseMulticonnect);
auto channel = this->server_->NewChannel();
auto channel = this->server_->NewChannel(false);
if (!channel)
{
this->FatalError();
@ -178,5 +235,5 @@ void AuRPCServerChannel::FatalError()
bool AuRPCServerChannel::Init()
{
return this->pipe.Init();
return this->pipe.Init(this->parent->optPipeLength);
}

View File

@ -10,9 +10,13 @@
#include "AuRPCChannel.hpp"
#include "AuRPCPipe.hpp"
struct AuRPCServerChannel : AuRPCChannel
struct AuRPCServerChannel :
AuRPCChannel,
AuIRPCSession,
AuEnableSharedFromThis<AuRPCServerChannel>
{
AuRPCServerChannel(AuSPtr<AuRPC> parent, AuSPtr<AuRPCServer> server);
~AuRPCServerChannel();
bool Init();
AuString ExportString();
@ -24,6 +28,9 @@ struct AuRPCServerChannel : AuRPCChannel
virtual void OnDisconnect(bool error) override;
virtual bool OnDataAvailable(AuByteBuffer& view) override;
virtual AuUInt64 GetConnectTimeNS() override;
virtual void SendMessage(const AuMemoryViewRead &view) override;
void FatalError();
void SendToNewChannel();
@ -33,11 +40,16 @@ struct AuRPCServerChannel : AuRPCChannel
{
this->isTempChannel_ = true;
}
void RemoveFromParent();
private:
friend struct AuRPCServer;
AuList<AuSPtr<AuRPCServerChannel>> subchannels_;
AuSPtr<AuRPCServer> server_;
bool isTempChannel_ {};
AuRPCPipe pipe;
AuSPtr<AuRPC> parent;
AuUInt64 uConnectTime_ {};
AuSPtr<AuRPC> parent;
};