/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuRPCPipe.cpp Date: 2022-6-29 Author: Reece ***/ #include #include "AuRPC.hpp" #include "AuRPCPipe.hpp" #include "AuRPCPipePacket.hpp" #include "AuRPCRequest.hpp" #include "AuRPCChannel.hpp" #include "AuRPCClientChannel.hpp" struct RPCTransaction : AuIO::IAsyncFinishedSubscriber, AuEnableSharedFromThis { AuSPtr transaction; AuMemoryViewRead view; AuRPCPipePacket packet; void OnAsyncFileOpFinished(AuUInt64 offset, AuUInt32 length) override { transaction->SetCallback({}); if (length != view.length) { this->Fail(); if (this->packet.clientChannel) { this->packet.clientChannel->FatalIOError(); } } else { this->Sent(); } } bool SendClient() { if (packet.protPacket) { return true; } if (!packet.clientRequest) { return false; } packet.clientRequest->state = ERPCRequestState::eSubmitting; if (!AuTryInsert(packet.clientChannel->outstandingRequests, packet.clientRequest)) { return false; } return true; } void Fail() { if (packet.clientChannel) { packet.clientRequest->callback->OnResponse(AuRPCResponse(ERPCError::eIOError)); packet.clientRequest->state = ERPCRequestState::eFailed; } this->Remove(); } void Remove() { if (packet.clientChannel) { AuTryRemove(packet.clientChannel->outstandingRequests, packet.clientRequest); } } void Sent() { if (packet.clientRequest) { packet.clientRequest->state = ERPCRequestState::eSent; } } }; bool AuRPCPipe::SendPacket(const AuRPCPipePacket &packet) { auto ioTransaction = this->pipe->NewAsyncTransaction(); if (!ioTransaction) { return false; } auto transactionObject = AuMakeShared(); if (!transactionObject) { return false; } transactionObject->packet = packet; transactionObject->view = packet.clientRequest ? packet.clientRequest->GetData() : AuMemoryViewRead(*packet.serverResponse->message); transactionObject->transaction = ioTransaction; transactionObject->transaction->SetCallback(transactionObject); AuWorkerPId_t pid; if (packet.clientChannel) { pid = this->channel->ToContext()->pinnedClientThread; } else { pid = this->channel->ToContext()->server.worker; } if (pid != AuAsync::GetCurrentWorkerPId()) { AuAsync::NewWorkFunction(pid, [=]() { transactionObject->SendClient(); if (!transactionObject->transaction->StartWrite(0, AuMemoryViewRead(transactionObject->view, transactionObject->SharedFromThis()))) { transactionObject->Fail(); } })->Dispatch(); } else { transactionObject->SendClient(); if (!transactionObject->transaction->StartWrite(0, AuMemoryViewRead(transactionObject->view, transactionObject->SharedFromThis()))) { transactionObject->Fail(); } } return true; } AuRPCPipe::~AuRPCPipe() { Deinit(); } AuRPCPipe::AuRPCPipe(AuRPCChannel* channel) : channel(channel) { } // IIOEventListenerFunctional void AuRPCPipe::OnIOTick() { this->OnConnect(); } void AuRPCPipe::OnIOFailure() { SysPushErrorIO("IO Connect Failure"); OnFatalError(); } void AuRPCPipe::OnIOComplete() { //looking for who the fuck asked //AuLogDbg("Connection watch operation completed successfully"); } // Pipe callbacks void AuRPCPipe::OnPipePartialEvent(AuUInt trasnferred) { RpcLogDebug("Consumed bytes"); } void AuRPCPipe::OnPipeSuccessEvent() { if (this->isClient_) { OnError(false); return; } RpcLogDebug("Waiting for next socket"); this->isOpenWork = GetRPCProcessor()->StartSimpleLSWatch(this->pipe->AsReadChannelIsOpen(), AuSPtr(this->channel->ToContext(), this)); } void AuRPCPipe::OnPipeFailureEvent() { RpcLogDebug("Couldn't listen..."); this->OnFatalError(); } void AuRPCPipe::OnPipeReallocEvent(bool bSuccess) { } // IIOBufferedStreamAvailable bool AuRPCPipe::OnDataAvailable(AuByteBuffer& view) { return this->channel->OnDataAvailable(view); } void AuRPCPipe::OnFatalError() { if (AuExchange(this->errored_, true)) { return; } OnError(true); } void AuRPCPipe::OnError(bool fatal) { this->channel->OnDisconnect(true); } void AuRPCPipe::OnConnect() { RpcLogDebug("RPC Pipe connection initiated. Beginning read..."); AuIO::IOPipeRequestAIO req; req.pAsyncTransaction = this->pipe->NewAsyncTransaction(); req.bIsStream = true; req.output.type = AuIO::EPipeCallbackType::eTryHandleBufferedPart; req.output.handleBufferedStream.pOnData = AuSPtr(this->channel->ToContext(), this); req.pListener = AuSPtr(this->channel->ToContext(), this); // Create a pipe to process an asynchronous repeating stream read transaction of no particular read/EoS limit this->work = GetRPCProcessor()->ToPipeProcessor()->NewAIOPipe(req); if (!this->work) { SysPushErrorIO("Couldn't create an IO pipe for a new session connection"); this->OnFatalError(); return; } // Fire ye pipe consumption if (!this->work->Start()) { SysPushErrorIO("Couldn't start reading the IO pipe"); this->OnFatalError(); return; } if (!this->channel->OnConnect()) { SysPushErrorIO("Couldn't start the channel"); this->OnFatalError(); return; } } bool AuRPCPipe::Init(const AuString& str) { this->pipe = AuIPC::ImportPipe(str); if (!this->pipe) { SysPushErrorIO("Couldn't open an IPC pipe given handle: {}", str); return {}; } this->OnConnect(); this->isClient_ = true; return true;// WaitForOtherEnd(); } bool AuRPCPipe::Init(AuOptional optLength) { this->pipe = optLength ? AuIPC::NewPipeEx(optLength.value()) : AuIPC::NewPipe(); if (!this->pipe) { SysPushErrorIO("Couldn't spare the resources required for an IPC pipe"); return {}; } return WaitForOtherEnd(); } bool AuRPCPipe::WaitForOtherEnd() { this->isOpenWork = GetRPCProcessor()->StartSimpleLSWatchEx(this->pipe->AsReadChannelIsOpen(), AuSPtr(this->channel->ToContext(), this), true); return bool(this->isOpenWork); } void AuRPCPipe::Deinit() { if (this->work) { work->End(); work.reset(); } }