From 82f87679d2a961c98bdee120117aeb46ce649a28 Mon Sep 17 00:00:00 2001 From: Reece Wilson Date: Sat, 2 Jul 2022 23:08:52 +0100 Subject: [PATCH] Initial Commit --- .gitignore | 218 +++++++++++++++++++++++++++ Aurora.json | 7 + Include/AuRPCAPI.hpp | 138 +++++++++++++++++ LICENSE | 8 + README.md | 68 +++++++++ Source/AuRPC.cpp | 66 +++++++++ Source/AuRPC.hpp | 59 ++++++++ Source/AuRPCChannel.hpp | 16 ++ Source/AuRPCClientChannel.cpp | 198 +++++++++++++++++++++++++ Source/AuRPCClientChannel.hpp | 47 ++++++ Source/AuRPCPipe.cpp | 268 ++++++++++++++++++++++++++++++++++ Source/AuRPCPipe.hpp | 49 +++++++ Source/AuRPCPipePacket.hpp | 16 ++ Source/AuRPCRequest.cpp | 87 +++++++++++ Source/AuRPCRequest.hpp | 36 +++++ Source/AuRPCServer.cpp | 133 +++++++++++++++++ Source/AuRPCServer.hpp | 33 +++++ Source/AuRPCServerChannel.cpp | 169 +++++++++++++++++++++ Source/AuRPCServerChannel.hpp | 43 ++++++ 19 files changed, 1659 insertions(+) create mode 100644 .gitignore create mode 100644 Aurora.json create mode 100644 Include/AuRPCAPI.hpp create mode 100644 LICENSE create mode 100644 README.md create mode 100644 Source/AuRPC.cpp create mode 100644 Source/AuRPC.hpp create mode 100644 Source/AuRPCChannel.hpp create mode 100644 Source/AuRPCClientChannel.cpp create mode 100644 Source/AuRPCClientChannel.hpp create mode 100644 Source/AuRPCPipe.cpp create mode 100644 Source/AuRPCPipe.hpp create mode 100644 Source/AuRPCPipePacket.hpp create mode 100644 Source/AuRPCRequest.cpp create mode 100644 Source/AuRPCRequest.hpp create mode 100644 Source/AuRPCServer.cpp create mode 100644 Source/AuRPCServer.hpp create mode 100644 Source/AuRPCServerChannel.cpp create mode 100644 Source/AuRPCServerChannel.hpp diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..675acaa --- /dev/null +++ b/.gitignore @@ -0,0 +1,218 @@ +# Aurora's general purpose JS/TS/C/C++/Go vs/vscode/intellij/codelite gitignore reference +# Almost usable for Java and Qt + +# Aurora build configuration +Build_CompilerWorkingDirectory/* +Build_Developers/* +Build_Ship/* +Build_Internal/* +Build_Develop/* +Build_Stage/* +Build_Ship/* +Build_Workspace/* +Build_Symbols/* +Build_Link/* +Build/Developers/* +Build/Ship/* +Build/Develop/* +Build/Stage/* +Build/Ship/* +Build/Workspace/* +Build/Symbols/* +Build/Link/* + +# License Headers VS extension +*.licenseheader + +# Binaries / object files +*.dll +*.exe +*.obj +*.so +*.so.* +*.la +*.lai +*.pdb +*.idb +*.exe~ +*.obj +*.dynlib +*.dylib +*.lib +*.d +*.o +*.a +*.la +*.slo +*.lo +*.out +# go unit test +*.test + +# Autogenerated project files +compile_flags.txt +*.mk +*.project +*cmake +Makefile +*.vcxproj +*.xcodeproj + +# IDE trash +.vscode +.vs +/*.gcno +.intellij +.clion +*.vcxproj.filters +*.vcxproj.user +*.tlog + +# OSX +.DS_Store +.AppleDouble +.LSOverride +xcuserdata/ + +# Win32 +Thumbs.db +Thumbs.db:encryptable +ehthumbs.db +ehthumbs_vista.db +*.lnk + +# Linux is trash and cant hotswap like NT +.nfs* +.fuse_hidden* + +# Ninja +.ninja_deps +.ninja_log + +# PID locks +*.pid +*.pid.lock + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# JetBrains +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# AWS User-specific +.idea/**/aws.xml + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# CMake +cmake-build-*/ + +# Android Studio +.idea/caches/build_file_checksums.ser + +# why would we ever ship this dir? +.idea/caches/* + +# NodeJS +npm-debug.log* +yarn-debug.log* +yarn-error.log* +lerna-debug.log* +.pnpm-debug.log* + +# node-waf configuration +.lock-wscript + +# Dependency directories +node_modules/ +jspm_packages/ + +# Snowpack dependency directory (https://snowpack.dev/) +web_modules/ + +# TypeScript cache +*.tsbuildinfo + +# Optional npm cache directory +.npm + +# Optional eslint cache +.eslintcache + +# yarn v2 +.yarn/cache +.yarn/unplugged +.yarn/build-state.yml +.yarn/install-state.gz +.pnp.* + +# VS Code Extensions +.vscode-test + +# Qt unit tests +target_wrapper.* + +# QtCreator +*.autosave + +# QtCreator Qml +*.qmlproject.user +*.qmlproject.user.* + +# QtCreator CMake +CMakeLists.txt.user* + +# QtCreator 4.8< compilation database +compile_commands.json + +# QtCreator local machine specific files for imported projects +*creator.user* + +*_qmlcache.qrc + +# QT cache and user files +/.qmake.cache +/.qmake.stash +*.pro.user +*.pro.user.* +*.qbs.user +*.qbs.user.* +*.moc + +# Java trash +hs_err_pid* +.gradle +gradle-app.setting +!gradle-wrapper.jar +.gradletasknamecache +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties +.mvn/wrapper/maven-wrapper.jar \ No newline at end of file diff --git a/Aurora.json b/Aurora.json new file mode 100644 index 0000000..357ae13 --- /dev/null +++ b/Aurora.json @@ -0,0 +1,7 @@ +{ + "type": "aurora", + "projectType": "StaticLib", + "name": "AuroraRPC", + "include-depends": ["AuroraRuntime"], + "depends": ["AuroraRuntime"] +} \ No newline at end of file diff --git a/Include/AuRPCAPI.hpp b/Include/AuRPCAPI.hpp new file mode 100644 index 0000000..6ab7744 --- /dev/null +++ b/Include/AuRPCAPI.hpp @@ -0,0 +1,138 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuRPCAPI.hpp + Date: 2022-6-29 + Author: Reece +***/ +#pragma once + +struct AuRPCRequest; + +AUE_DEFINE(ERPCRequestState, ( + ePending, + eSubmitting, + eSent, + + eResponse, + eFailed +)); + +AUE_DEFINE(ERPCError, ( + eNone, + eMissingService, + eMissingMethod, + eProtocolError, + eIOError, + eChannelTerminated, + eChannelFailure, + eAborted +)); + +struct AuRPCResponse +{ + virtual ~AuRPCResponse() + { + } + AuRPCResponse() : error(ERPCError::eNone) + { + } + + AuRPCResponse(ERPCError error) : error(error) + { + } + + ERPCError error; + AuUInt64 cookie {}; + AuByteBuffer *message {}; + + void Deserialize() + { + this->error = AuStaticCast(message->Read()); + this->cookie = message->Read(); + } + + void PrepareMessage() + { + message->Write(0); + message->Write(this->cookie); + } + + void WriteError() + { + message->Write(AuStaticCast(this->error)); + message->Write(this->cookie); + } +}; + +struct AuRPCResponseOwned : AuRPCResponse +{ + AuSPtr buffer; + + void PrepareResponse(AuUInt8 type) + { + buffer = AuMakeShared(); + SysAssert(buffer); + this->message = buffer.get(); + buffer->Write(0); + buffer->Write(type); + } + + void FinalizeWrite() + { + auto length = buffer->writePtr - buffer->base; + buffer->writePtr = buffer->base; + buffer->Write(length); + buffer->writePtr += (length - 4); + } +}; + +AUI_INTERFACE(AuRPCRequestCallback, + AUI_METHOD(void, OnResponse, (const AuRPCResponse &, response)) +); + +AUI_INTERFACE(AuIRPCChannelCallbacks, + AUI_METHOD(void, OnConnect, ()), + AUI_METHOD(void, OnDisconnect, ()) +); + +struct AuIRPCRequest +{ + virtual bool SetData(const AuByteBuffer &toRead) = 0; + virtual bool SetData(const AuMemoryViewRead &view) = 0; + + virtual void SetCallback(AuSPtr callback) = 0; +}; + +struct AuIRPCClientChannel +{ + virtual void Disconnect() = 0; + virtual void SendRequest(AuSPtr response) = 0; + virtual bool IsConnected() = 0; + virtual void SetCallbacks(const AuSPtr &callbacks) = 0; +}; + +struct AuIRPCService +{ + virtual AuUInt32 GetId() = 0; + virtual void Dispatch(AuRPCResponse &response, AuUInt32 id, AuByteBuffer& buffer) = 0; +}; + + +struct AuIRPCServer +{ + virtual bool RegisterService(const AuSPtr service) = 0; + virtual AuString ExportString() = 0; +}; + +struct AuIRPC +{ + virtual bool StartClient(AuAsync::WorkerPId_t worker) = 0; + virtual bool StartServer(AuAsync::WorkerPId_t worker) = 0; + + virtual AuSPtr ToServer() = 0; + virtual AuSPtr Connect(const AuString& str) = 0; +}; + +AuSPtr AuRPCNewRequest(AuUInt32 serviceId, AuUInt32 methodId); +AuSPtr AuRPCNewInstance(); \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..cd213d2 --- /dev/null +++ b/LICENSE @@ -0,0 +1,8 @@ +Copyright 2022 J Reece Wilson + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + diff --git a/README.md b/README.md new file mode 100644 index 0000000..8e4636f --- /dev/null +++ b/README.md @@ -0,0 +1,68 @@ +# Experimental RPC library + + +## Example + +```cpp +#include + +static AuSPtr gRpcServer = AuRPCNewInstance(); +static AuSPtr gRpcClient = AuRPCNewInstance(); +static AuSPtr gRpcClientChannel; + +void Run() +{ + SysAssert(gRpcServer->StartServer(AuAsync::GetCurrentWorkerPId())); + SysAssert(gRpcClient->StartClient(AuAsync::GetCurrentWorkerPId())); + + auto server = gRpcServer->ToServer(); + + auto handle = server->ExportString(); + + struct DummyService : AuIRPCService + { + AuUInt32 GetId() override + { + return 2; + } + + void Dispatch(AuRPCResponse &response, AuUInt32 id, AuByteBuffer &buffer) override + { + AuLogDbg("ID = {}, bytes = {}", id, buffer.RemainingBytes()); + + response.message->Write("Hello World"); + } + }; + + server->RegisterService(AuMakeShared()); + + gRpcClientChannel = gRpcClient->Connect(handle); + SysAssert(gRpcClientChannel); + + auto callback = AuMakeShared(); + SysAssert(callback); + + callback->OnConnectFunctional = []() + { + auto request = AuRPCNewRequest(2, 3); + SysAssert(request); + + auto callback = AuMakeShared(); + SysAssert(callback); + + callback->OnResponseFunctional = [](const AuRPCResponse &res) + { + AuLogDbg("Complete! Message = {}", res.message->Read()); + }; + + request->SetCallback(callback); + gRpcClientChannel->SendRequest(request); + }; + + gRpcClientChannel->SetCallbacks(callback); +} +``` + + + +__Linux IO is lagging behind__ \ No newline at end of file diff --git a/Source/AuRPC.cpp b/Source/AuRPC.cpp new file mode 100644 index 0000000..b87e4a8 --- /dev/null +++ b/Source/AuRPC.cpp @@ -0,0 +1,66 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuRPC.cpp + Date: 2022-6-29 + Author: Reece +***/ +#include +#include "AuRPC.hpp" +#include "AuRPCClientChannel.hpp" + +static thread_local AuSPtr tlsIOProcessor; + +AuSPtr GetRPCProcessor() +{ + return tlsIOProcessor ? + tlsIOProcessor : + tlsIOProcessor = AuIO::NewIOProcessorOnThread(false, AuAsync::GetCurrentWorkerPId()); +} + +bool AuRPC::StartClient(AuAsync::WorkerPId_t worker) +{ + if (worker.second == AuAsync::kThreadIdAny) + { + return false; + } + + this->pinnedClientThread = worker; + return true; +} + +bool AuRPC::StartServer(AuAsync::WorkerPId_t worker) +{ + return this->server.Init(this, worker); +} + +AuSPtr AuRPC::ToServer() +{ + return AuSPtr(AuSharedFromThis(), &this->server); +} + +AuSPtr AuRPC::Connect(const AuString& str) +{ + auto eh = AuMakeShared(this->SharedFromThis()); + if (!eh) + { + return {}; + } + + //if (!AuTryInsert(this->clientChannels, eh)) + //{ + // return {}; + //} + + if (!eh->Init(str)) + { + return {}; + } + + return eh; +} + +AuSPtr AuRPCNewInstance() +{ + return AuMakeShared(); +} \ No newline at end of file diff --git a/Source/AuRPC.hpp b/Source/AuRPC.hpp new file mode 100644 index 0000000..f06ca41 --- /dev/null +++ b/Source/AuRPC.hpp @@ -0,0 +1,59 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuRPC.hpp + Date: 2022-6-29 + Author: Reece +***/ +#pragma once + +struct Session; +struct AuRPCServer; +struct AuRPCServerChannel; +struct AuRPCClientChannel; +struct AuRPCRequest; +struct AuRPC; +struct AuRPCRequestCallback; +struct AuIRPCService; + +#include "AuRPCAPI.hpp" +#include + +#include "AuRPCServer.hpp" + +struct AuRPC : AuIRPC, AuEnableSharedFromThis +{ + + bool StartClient(AuAsync::WorkerPId_t worker); + bool StartServer(AuAsync::WorkerPId_t worker); + + AuSPtr ToServer(); + AuSPtr Connect(const AuString& str); + +private: + + AuAsync::WorkerPId_t pinnedClientThread; + AuRPCServer server; + AuList> clientChannels; +}; + +AuSPtr GetRPCProcessor(); + +static const auto kRequestConnect = 1; +static const auto kRequestRPC = 2; + +static const auto kResponseConnectOK = 10; +static const auto kResponseMulticonnect = 11; +static const auto kResponseRPC = 12; + +static const AuUInt8 kResponseError[512] = {0}; + +#if (defined(AU_CFG_ID_DEBUG) || defined(AU_CFG_ID_INTERNAL__) /*!!!*/) && !defined(AU_RPC_DEBUG) + #define AU_RPC_DEBUG +#endif + +#if defined(AU_RPC_DEBUG) + #define RpcLogDebug AuLogDbg +#else + #define RpcLogDebug(...) +#endif \ No newline at end of file diff --git a/Source/AuRPCChannel.hpp b/Source/AuRPCChannel.hpp new file mode 100644 index 0000000..d897b5d --- /dev/null +++ b/Source/AuRPCChannel.hpp @@ -0,0 +1,16 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuRPCChannel.hpp + Date: 2022-6-29 + Author: Reece +***/ +#pragma once + +struct AuRPCChannel +{ + virtual AuSPtr ToContext() = 0; + virtual bool OnConnect() = 0; + virtual void OnDisconnect(bool error) = 0; + virtual bool OnDataAvailable(AuByteBuffer& view) = 0; +}; diff --git a/Source/AuRPCClientChannel.cpp b/Source/AuRPCClientChannel.cpp new file mode 100644 index 0000000..f49e5a7 --- /dev/null +++ b/Source/AuRPCClientChannel.cpp @@ -0,0 +1,198 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuRPCClientChannel.cpp + Date: 2022-6-29 + Author: Reece +***/ +#include +#include "AuRPC.hpp" +#include "AuRPCClientChannel.hpp" +#include "AuRPCRequest.hpp" +#include "AuRPCPipePacket.hpp" + +bool AuRPCClientChannel::OnConnect() +{ + auto request = AuMakeShared(); + request->WriteHeaderConnect(); + AuRPCPipePacket packet; + packet.clientRequest = request; + packet.protPacket = true; + packet.clientChannel = SharedFromThis(); + this->pipe_.SendPacket(packet); + return true; +} + +void AuRPCClientChannel::Disconnect() +{ + this->pipe_.Deinit(); + Finalize(); +} + +void AuRPCClientChannel::SendRequest(AuSPtr response2) +{ + auto response = AuStaticCast(response2); + if (!response) + { + SysPushErrorArg(); + return; + } + + RpcLogDebug("Client sending request of bytes: {}", response->GetData().length); + + AuRPCPipePacket packet; + packet.clientRequest = response; + packet.clientChannel = SharedFromThis(); + this->pipe_.SendPacket(packet); +} + +AuSPtr AuRPCClientChannel::ToContext() +{ + return this->parent_; +} + +void AuRPCClientChannel::OnDisconnect(bool error) +{ + RpcLogDebug("AuRPCClientChannel::OnDisconnect"); +} + +bool AuRPCClientChannel::OnDataAvailable(AuByteBuffer &view) +{ + RpcLogDebug("AuRPCClientChannel::OnDataAvailable"); + + do + { + auto bytesAvailable = view.RemainingBytes(); + + if (bytesAvailable < 4) + { + return true; + } + + auto oldRead = view.readPtr; + + if (view.Read() > bytesAvailable) + { + view.readPtr = oldRead; + return true; + } + + auto packetType = view.Read(); + + if (packetType == kResponseConnectOK) + { + this->bConnected_ = true; + this->ProcessConnectionOK(); + } + else if (packetType == kResponseMulticonnect) + { + this->pipe_.Deinit(); + this->bConnectingAlternate_ = true; + this->Init(view.Read()); + } + else if (packetType == kResponseRPC) + { + auto response = AuMakeShared(); + if (!response) + { + SysPushErrorMem(); + view.readPtr = oldRead; + return true; + } + + response->message = &view; + + response->Deserialize(); + this->ProcessResponse(response); + } + + } while (true); + + return true; +} + +bool AuRPCClientChannel::Init(const AuString &ipc) +{ + return this->pipe_.Init(ipc); +} + +void AuRPCClientChannel::FatalIOError() +{ + RpcLogDebug("AuRPCClientChannel::FatalIOError"); + Disconnect(); +} + +void AuRPCClientChannel::ProcessConnectionOK() +{ + auto re = AuExchange(this->outstandingRequests, AuList>{}); + + if (this->callbacks_) + { + this->callbacks_->OnConnect(); + } + + for (auto a : re) + { + this->SendRequest(a); + } +} + +void AuRPCClientChannel::ProcessResponse(const AuSPtr &response) +{ + RpcLogDebug("AuRPCClientChannel::ProcessResponse"); + + for (auto itr = this->outstandingRequests.begin(); + itr != this->outstandingRequests.end(); + ) + { + if (response->cookie != AuUInt(itr->get())) + { + itr++; + continue; + } + + auto req = *itr; + this->outstandingRequests.erase(itr); + + if (req->callback) + { + req->callback->OnResponse(*response); + } + } +} + +bool AuRPCClientChannel::IsConnected() +{ + return this->bConnected_; +} + +void AuRPCClientChannel::SetCallbacks(const AuSPtr &callbacks) +{ + if (!callbacks) + { + SysPushErrorArg(); + return; + } + + if (IsConnected()) + { + callbacks->OnConnect(); + } + + this->callbacks_ = callbacks; +} + +void AuRPCClientChannel::Finalize() +{ + auto re = AuExchange(this->outstandingRequests, AuList>{}); + + for (auto a : re) + { + a->callback->OnResponse(AuRPCResponse(ERPCError::eAborted)); + } + + if (this->callbacks_) + { + this->callbacks_->OnDisconnect(); + } +} diff --git a/Source/AuRPCClientChannel.hpp b/Source/AuRPCClientChannel.hpp new file mode 100644 index 0000000..6323e0e --- /dev/null +++ b/Source/AuRPCClientChannel.hpp @@ -0,0 +1,47 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuRPCClientChannel.hpp + Date: 2022-6-29 + Author: Reece +***/ +#pragma once + +#include "AuRPCChannel.hpp" +#include "AuRPCPipe.hpp" + +struct AuRPCClientChannel : AuRPCChannel, AuIRPCClientChannel, AuEnableSharedFromThis +{ + AuRPCClientChannel(AuSPtr parent) : parent_(parent), pipe_(this) + { + } + + bool Init(const AuString& ipc); + void Finalize(); + + void Disconnect() override; + void SendRequest(AuSPtr response) override; + + void FatalIOError(); + + virtual AuSPtr ToContext() override; + virtual bool OnConnect() override; + virtual void OnDisconnect(bool error) override; + virtual bool OnDataAvailable(AuByteBuffer& view) override; + + void ProcessConnectionOK(); + void ProcessResponse(const AuSPtr& response); + + AuList> outstandingRequests; + + virtual bool IsConnected() override; + virtual void SetCallbacks(const AuSPtr &callbacks) override; + + +private: + bool bConnectingAlternate_{}; + bool bConnected_{}; + AuSPtr callbacks_; + AuRPCPipe pipe_; + AuSPtr parent_; +}; \ No newline at end of file diff --git a/Source/AuRPCPipe.cpp b/Source/AuRPCPipe.cpp new file mode 100644 index 0000000..0bd3bae --- /dev/null +++ b/Source/AuRPCPipe.cpp @@ -0,0 +1,268 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuRPCPipe.cpp + Date: 2022-6-29 + Author: Reece +***/ +#include +#include "AuRPC.hpp" +#include "AuRPCPipe.hpp" +#include "AuRPCPipePacket.hpp" +#include "AuRPCRequest.hpp" +#include "AuRPCChannel.hpp" +#include "AuRPCClientChannel.hpp" + +bool AuRPCPipe::SendPacket(const AuRPCPipePacket &packet) +{ + + struct RPCTransaction : AuIO::IAsyncFinishedSubscriber, AuEnableSharedFromThis + { + AuSPtr transaction; + AuMemoryViewRead view; + AuRPCPipePacket packet; + + void OnAsyncFileOpFinished(AuUInt64 offset, AuUInt32 length) override + { + transaction->SetCallback({}); + if (length != view.length) + { + this->Fail(); + if (this->packet.clientChannel) + { + this->packet.clientChannel->FatalIOError(); + } + } + else + { + this->Sent(); + } + } + + bool SendClient() + { + if (packet.protPacket) + { + return true; + } + + if (!packet.clientRequest) + { + return false; + } + + packet.clientRequest->state = ERPCRequestState::eSubmitting; + + if (!AuTryInsert(packet.clientChannel->outstandingRequests, packet.clientRequest)) + { + return false; + } + + return true; + } + + void Fail() + { + if (packet.clientChannel) + { + packet.clientRequest->callback->OnResponse(AuRPCResponse(ERPCError::eIOError)); + packet.clientRequest->state = ERPCRequestState::eFailed; + } + + this->Remove(); + } + + void Remove() + { + if (packet.clientChannel) + { + AuTryRemove(packet.clientChannel->outstandingRequests, packet.clientRequest); + } + } + + void Sent() + { + if (packet.clientRequest) + { + packet.clientRequest->state = ERPCRequestState::eSent; + } + } + }; + + auto ioTransaction = this->pipe->NewAsyncTransaction(); + if (!ioTransaction) + { + return false; + } + + auto transactionObject = AuMakeShared(); + if (!transactionObject) + { + return false; + } + + transactionObject->packet = packet; + transactionObject->view = packet.clientRequest ? packet.clientRequest->GetData() : *packet.serverResponse->message; + transactionObject->transaction = ioTransaction; + + transactionObject->transaction->SetCallback(transactionObject); + + transactionObject->SendClient(); + + if (!transactionObject->transaction->StartWrite(0, AuSPtr(transactionObject->SharedFromThis(), &transactionObject->view))) + { + transactionObject->Fail(); + } + + return true; +} + + +AuRPCPipe::~AuRPCPipe() +{ + Deinit(); +} + +AuRPCPipe::AuRPCPipe(AuRPCChannel* channel) : channel(channel) +{ + +} + +// IIOEventListenerFunctional +void AuRPCPipe::OnIOTick() +{ + this->OnConnect(); +} + +void AuRPCPipe::OnIOFailure() +{ + SysPushErrorIO("IO Connect Failure"); + OnFatalError(); +} + +void AuRPCPipe::OnIOComplete() +{ + //looking for who the fuck asked + //AuLogDbg("Connection watch operation completed successfully"); +} + +// Pipe callbacks +void AuRPCPipe::OnPipePartialEvent(AuUInt trasnferred) +{ + RpcLogDebug("Consumed bytes"); +} + +void AuRPCPipe::OnPipeSuccessEvent() +{ + RpcLogDebug("Waiting for next socket"); + this->isOpenWork = GetRPCProcessor()->StartSimpleLSWatch(this->pipe->AsReadChannelIsOpen(), + AuSPtr(this->channel->ToContext(), this)); +} + +void AuRPCPipe::OnPipeFailureEvent() +{ + RpcLogDebug("Couldn't listen..."); + this->OnFatalError(); +} + +// IIOBufferedStreamAvailable +bool AuRPCPipe::OnDataAvailable(AuByteBuffer& view) +{ + return this->channel->OnDataAvailable(view); +} + +void AuRPCPipe::OnFatalError() +{ + if (AuExchange(this->errored_, true)) + { + return; + } + + OnError(true); + + AuLogWarn("Disable pipe here"); +} + +void AuRPCPipe::OnError(bool fatal) +{ + this->channel->OnDisconnect(true); +} + +void AuRPCPipe::OnConnect() +{ + RpcLogDebug("RPC Pipe connection initiated. Beginning read..."); + + AuIO::IOPipeRequestAIO req; + req.asyncTransaction = this->pipe->NewAsyncTransaction(); + req.isStream = true; + req.output.type = AuIO::EPipeCallbackType::eTryHandleBufferedPart; + req.output.handleBufferedStream.onData = AuSPtr(this->channel->ToContext(), this); + req.listener = AuSPtr(this->channel->ToContext(), this); + + // Create a pipe to process an asynchronous repeating stream read transaction of no particular read/EoS limit + this->work = GetRPCProcessor()->ToPipeProcessor()->NewAIOPipe(req); + if (!this->work) + { + SysPushErrorIO("Couldn't create an IO pipe for a new session connection"); + this->OnFatalError(); + return; + } + + // Fire ye pipe consumption + if (!this->work->Start()) + { + SysPushErrorIO("Couldn't start reading the IO pipe"); + this->OnFatalError(); + return; + } + + if (!this->channel->OnConnect()) + { + SysPushErrorIO("Couldn't start the channel"); + this->OnFatalError(); + return; + } +} + +bool AuRPCPipe::Init(const AuString& str) +{ + this->pipe = AuIPC::ImportPipe(str); + if (!this->pipe) + { + SysPushErrorIO("Couldn't open an IPC pipe given handle: {}", str); + return {}; + } + + this->OnConnect(); + this->isClient_ = true; + return true;// WaitForOtherEnd(); +} + +bool AuRPCPipe::Init() +{ + this->pipe = AuIPC::NewPipe(); + if (!this->pipe) + { + SysPushErrorIO("Couldn't spare the resources required for an IPC pipe"); + return {}; + } + + return WaitForOtherEnd(); +} + +bool AuRPCPipe::WaitForOtherEnd() +{ + this->isOpenWork = GetRPCProcessor()->StartSimpleLSWatchEx(this->pipe->AsReadChannelIsOpen(), + AuSPtr(this->channel->ToContext(), this), + true); + return bool(this->isOpenWork); +} + +void AuRPCPipe::Deinit() +{ + if (this->work) + { + work->End(); + work.reset(); + } +} \ No newline at end of file diff --git a/Source/AuRPCPipe.hpp b/Source/AuRPCPipe.hpp new file mode 100644 index 0000000..dd7c8a8 --- /dev/null +++ b/Source/AuRPCPipe.hpp @@ -0,0 +1,49 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuRPCPipe.hpp + Date: 2022-6-29 + Author: Reece +***/ +#pragma once + +struct AuRPCPipePacket; +struct AuRPCChannel; + +struct AuRPCPipe : AuIO::IIOBufferedStreamAvailable, AuIO::IIOSimpleEventListener, AuIO::IIOPipeEventListener +{ + AuRPCPipe(AuRPCChannel* channel); + ~AuRPCPipe(); + AuRPCChannel* channel; + + AuSPtr pipe; + AuSPtr work; + AuSPtr isOpenWork; + bool isClient_{}; + // IIOEventListenerFunctional + void OnIOTick() override; + void OnIOFailure() override; + void OnIOComplete() override; + + // Pipe callbacks + void OnPipePartialEvent(AuUInt trasnferred) override; + void OnPipeSuccessEvent() override; + void OnPipeFailureEvent() override; + + // IIOBufferedStreamAvailable + bool OnDataAvailable(AuByteBuffer& view) override; + + void OnFatalError(); + void OnError(bool fatal); + void OnConnect(); + + bool SendPacket(const AuRPCPipePacket& packet); + + bool Init(const AuString& str); + bool Init(); + bool WaitForOtherEnd(); + void Deinit(); + +private: + bool errored_{}; +}; diff --git a/Source/AuRPCPipePacket.hpp b/Source/AuRPCPipePacket.hpp new file mode 100644 index 0000000..3712fbc --- /dev/null +++ b/Source/AuRPCPipePacket.hpp @@ -0,0 +1,16 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuRPCPipePacket.hpp + Date: 2022-6-29 + Author: Reece +***/ +#pragma once + +struct AuRPCPipePacket +{ + AuSPtr clientChannel; + AuSPtr clientRequest; + bool protPacket {}; + AuSPtr serverResponse; +}; \ No newline at end of file diff --git a/Source/AuRPCRequest.cpp b/Source/AuRPCRequest.cpp new file mode 100644 index 0000000..3079f71 --- /dev/null +++ b/Source/AuRPCRequest.cpp @@ -0,0 +1,87 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuRPCRequest.cpp + Date: 2022-6-29 + Author: Reece +***/ +#include +#include "AuRPC.hpp" +#include "AuRPCRequest.hpp" + +bool AuRPCRequest::SetData(const AuByteBuffer& toRead) +{ + return this->SetData(AuMemoryViewRead(toRead)); +} + +bool AuRPCRequest::SetData(const AuMemoryViewRead& view) +{ + data.Reset(); + this->packetLength = view.length + this->HeaderLength(); + data.Reserve(view.length + this->HeaderLength()); + if (!this->data) + { + SysPushErrorMem(); + return false; + } + + this->WriteRPCHeader(); + this->data.Write(view.ptr, view.length); + return bool(data); +} + +void AuRPCRequest::SetCallback(AuSPtr callback) +{ + this->callback = callback; +} + +AuMemoryViewRead AuRPCRequest::GetData() +{ + return this->data; +} + +void AuRPCRequest::WriteDummy() +{ + WriteRPCHeader(); +} + +void AuRPCRequest::WriteHeaderConnect() +{ + this->data.Write(5); + this->data.Write(kRequestConnect); +} + +void AuRPCRequest::WriteRPCHeader() +{ + this->data.Write(this->packetLength); + this->data.Write(kRequestRPC); + this->data.Write(this->serviceId); + this->data.Write(this->methodId); + this->data.Write(AuUInt64(AuUInt(this))); +} + +AuUInt32 AuRPCRequest::HeaderLength() +{ + return 4 + 1 + 4 /*service id*/ + 4 /*method*/ + 8 /*uid*/; +} + +ERPCRequestState AuRPCRequest::GetState() +{ + return this->state; +} + +AuSPtr AuRPCNewRequest(AuUInt32 serviceId, AuUInt32 methodId) +{ + auto req = AuMakeShared(); + if (!req) + { + return {}; + } + + req->serviceId = serviceId; + req->methodId = methodId; + + req->WriteRPCHeader(); + + return req; +} \ No newline at end of file diff --git a/Source/AuRPCRequest.hpp b/Source/AuRPCRequest.hpp new file mode 100644 index 0000000..2a13bf5 --- /dev/null +++ b/Source/AuRPCRequest.hpp @@ -0,0 +1,36 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuRPCRequest.hpp + Date: 2022-6-29 + Author: Reece +***/ +#pragma once + +struct AuRPCRequest : AuIRPCRequest +{ + AuUInt32 serviceId{}; + ERPCRequestState state{}; + AuUInt32 methodId{}; + AuSPtr callback; + AuUInt64 cookie {}; + + bool SetData(const AuByteBuffer& toRead) override; + bool SetData(const AuMemoryViewRead& view) override; + + void SetCallback(AuSPtr callback) override; + + ERPCRequestState GetState(); + + AuMemoryViewRead GetData(); + + void WriteDummy(); + void WriteHeaderConnect(); + void WriteRPCHeader(); + AuUInt32 HeaderLength(); + +private: + AuUInt32 packetLength{}; + AuByteBuffer data; +}; + diff --git a/Source/AuRPCServer.cpp b/Source/AuRPCServer.cpp new file mode 100644 index 0000000..f44f42d --- /dev/null +++ b/Source/AuRPCServer.cpp @@ -0,0 +1,133 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuRPCServer.cpp + Date: 2022-6-29 + Author: Reece +***/ +#include +#include "AuRPC.hpp" +#include "AuRPCServer.hpp" +#include "AuRPCServerChannel.hpp" +#include "AuRPCRequest.hpp" + +bool AuRPCServer::Init(AuRPC *parent, AuAsync::WorkerPId_t worker) +{ + if (this->channel) + { + SysPushErrorGen("Double init"); + return {}; + } + + this->lock = AuThreadPrimitives::RWLockUnique(); + if (!this->lock) + { + SysPushErrorMem(); + return {}; + } + + this->worker = worker; + this->parent = parent; + this->channel = NewChannel(); + + if (!this->channel) + { + return false; + } + + this->channel->MakeTemp(); + return true; +} + +bool AuRPCServer::RegisterService(const AuSPtr service) +{ + if (!service) + { + SysPushErrorArg(); + return {}; + } + + AU_LOCK_GUARD(this->lock->AsWritable()); + return AuTryInsert(this->serviceTable, service->GetId(), service); +} + +AuString AuRPCServer::ExportString() +{ + return ToPrimaryChannel()->ExportString(); +} + +AuSPtr AuRPCServer::ToPrimaryChannel() +{ + if (this->channel) + { + return this->channel; + } + + return this->channel = NewChannel(); +} + +void AuRPCServer::Dispatch(AuRPCServerChannel *channel, + const AuSPtr &request, + AuByteBuffer *buffer) +{ + AU_LOCK_GUARD(this->lock->AsReadable()); + + auto response = AuMakeShared(); + SysAssert(response); + + response->cookie = request->cookie; + response->PrepareResponse(kResponseRPC); + + auto itr = this->serviceTable.find(request->serviceId); + if (itr == this->serviceTable.end()) + { + response->FinalizeWrite(); + channel->SendResponse(response); + return; + } + + try + { + response->PrepareMessage(); + + itr->second->Dispatch(*response, request->methodId, *buffer); + + channel->SendResponse(response); + response->FinalizeWrite(); + } + catch (...) + { + SysPushErrorCatch(); + channel->FatalError(); + } +} + +AuSPtr AuRPCServer::NewChannel() +{ + auto eh = AuMakeShared(this->parent->SharedFromThis(), AuSPtr(this->parent->SharedFromThis(), this)); + if (!eh) + { + return {}; + } + + if ((this->worker.second != AuAsync::kThreadIdAny) && + (this->worker == AuAsync::GetCurrentWorkerPId())) + { + if (!eh->Init()) + { + return {}; + } + } + else + { + AuAsync::NewWorkItem(this->worker, AuMakeShared([&]() + { + if (!eh->Init()) + { + eh.reset(); + } + }), true)->Dispatch()->BlockUntilComplete(); + } + + return eh; +} diff --git a/Source/AuRPCServer.hpp b/Source/AuRPCServer.hpp new file mode 100644 index 0000000..fc8de0a --- /dev/null +++ b/Source/AuRPCServer.hpp @@ -0,0 +1,33 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuRPCServer.hpp + Date: 2022-6-29 + Author: Reece +***/ +#pragma once + +struct AuRPCRequest; +struct AuRPCServerChannel; + +struct AuRPCServer : AuIRPCServer +{ + bool Init(AuRPC* parent, AuAsync::WorkerPId_t worker); + bool RegisterService(const AuSPtr service) override; + + AuString ExportString() override; + AuSPtr ToPrimaryChannel(); + AuSPtr NewChannel(); + + void Dispatch(AuRPCServerChannel *channel, + const AuSPtr &request, + AuByteBuffer *buffer); + +private: + + AuRPC* parent; + AuSPtr channel; + AuAsync::WorkerPId_t worker; + AuThreadPrimitives::RWLockUnique_t lock; + AuHashMap> serviceTable; +}; \ No newline at end of file diff --git a/Source/AuRPCServerChannel.cpp b/Source/AuRPCServerChannel.cpp new file mode 100644 index 0000000..6d433a4 --- /dev/null +++ b/Source/AuRPCServerChannel.cpp @@ -0,0 +1,169 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuRPCServerChannel.cpp + Date: 2022-6-29 + Author: Reece +***/ +#include +#include "AuRPC.hpp" +#include "AuRPCServerChannel.hpp" +#include "AuRPCRequest.hpp" +#include "AuRPCPipePacket.hpp" + +AuRPCServerChannel::AuRPCServerChannel(AuSPtr parent, AuSPtr server) : + parent(parent), + pipe(this), + server_(server) +{ + +} + +AuSPtr AuRPCServerChannel::ToContext() +{ + return this->parent; +} + +void AuRPCServerChannel::SendResponse(AuSPtr response) +{ + if (response->message->flagWriteError) + { + this->FatalError(); + return; + } + + AuRPCPipePacket packet; + packet.serverResponse = response; + this->pipe.SendPacket(packet); +} + +AuString AuRPCServerChannel::ExportString() +{ + return this->pipe.pipe->ExportToString(); +} + +bool AuRPCServerChannel::OnConnect() +{ + RpcLogDebug("Server channel: on connect"); + return true; +} + +void AuRPCServerChannel::OnDisconnect(bool error) +{ + RpcLogDebug("Server channel disconnected: {}", error); +} + +bool AuRPCServerChannel::OnDataAvailable(AuByteBuffer& view) +{ + do + { + auto bytesAvailable = view.RemainingBytes(); + + if (bytesAvailable < 4) + { + return true; + } + + auto oldRead = view.readPtr; + + if (view.Read() > bytesAvailable) + { + view.readPtr = oldRead; + return true; + } + + auto packetType = view.Read(); + + if (packetType == kRequestConnect) + { + if (this->isTempChannel_) + { + this->SendToNewChannel(); + } + else + { + this->SendConnectOK(); + } + } + else if (packetType == kRequestRPC) + { + auto request = AuMakeShared(); + if (!request) + { + SysPushErrorMem(); + view.readPtr = oldRead; + return true; + } + + request->serviceId = view.Read(); + request->methodId = view.Read(); + request->cookie = view.Read(); + + this->server_->Dispatch(this, request, &view); + } + + } + while (true); + + return true; +} + +void AuRPCServerChannel::SendConnectOK() +{ + RpcLogDebug("AuRPCServerChannel::SendConnectOK"); + + auto res = AuMakeShared(); + if (!res) + { + FatalError(); + return; + } + + res->PrepareResponse(kResponseConnectOK); + res->FinalizeWrite(); + + this->SendResponse(res); +} + +void AuRPCServerChannel::SendToNewChannel() +{ + RpcLogDebug("AuRPCServerChannel::SendToNewChannel"); + + auto res = AuMakeShared(); + if (!res) + { + FatalError(); + return; + } + + res->PrepareResponse(kResponseMulticonnect); + auto channel = this->server_->NewChannel(); + if (!channel) + { + this->FatalError(); + return; + } + + if (!AuTryInsert(this->subchannels_, channel)) + { + this->FatalError(); + return; + } + + res->buffer->Write(channel->ExportString()); + + res->FinalizeWrite(); + + this->SendResponse(res); +} + +void AuRPCServerChannel::FatalError() +{ + RpcLogDebug("AuRPCServerChannel::FatalError"); + this->pipe.Deinit(); +} + +bool AuRPCServerChannel::Init() +{ + return this->pipe.Init(); +} diff --git a/Source/AuRPCServerChannel.hpp b/Source/AuRPCServerChannel.hpp new file mode 100644 index 0000000..251d999 --- /dev/null +++ b/Source/AuRPCServerChannel.hpp @@ -0,0 +1,43 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuRPCServerChannel.hpp + Date: 2022-6-29 + Author: Reece +***/ +#pragma once + +#include "AuRPCChannel.hpp" +#include "AuRPCPipe.hpp" + +struct AuRPCServerChannel : AuRPCChannel +{ + AuRPCServerChannel(AuSPtr parent, AuSPtr server); + + bool Init(); + AuString ExportString(); + + void SendResponse(AuSPtr response); + + virtual AuSPtr ToContext() override; + virtual bool OnConnect() override; + virtual void OnDisconnect(bool error) override; + virtual bool OnDataAvailable(AuByteBuffer& view) override; + + void FatalError(); + + void SendToNewChannel(); + void SendConnectOK(); + + inline void MakeTemp() + { + this->isTempChannel_ = true; + } +private: + + AuList> subchannels_; + AuSPtr server_; + bool isTempChannel_ {}; + AuRPCPipe pipe; + AuSPtr parent; +};