Initial Commit

This commit is contained in:
Reece Wilson 2022-07-02 23:08:52 +01:00
commit 82f87679d2
19 changed files with 1659 additions and 0 deletions

218
.gitignore vendored Normal file
View File

@ -0,0 +1,218 @@
# Aurora's general purpose JS/TS/C/C++/Go vs/vscode/intellij/codelite gitignore reference
# Almost usable for Java and Qt
# Aurora build configuration
Build_CompilerWorkingDirectory/*
Build_Developers/*
Build_Ship/*
Build_Internal/*
Build_Develop/*
Build_Stage/*
Build_Ship/*
Build_Workspace/*
Build_Symbols/*
Build_Link/*
Build/Developers/*
Build/Ship/*
Build/Develop/*
Build/Stage/*
Build/Ship/*
Build/Workspace/*
Build/Symbols/*
Build/Link/*
# License Headers VS extension
*.licenseheader
# Binaries / object files
*.dll
*.exe
*.obj
*.so
*.so.*
*.la
*.lai
*.pdb
*.idb
*.exe~
*.obj
*.dynlib
*.dylib
*.lib
*.d
*.o
*.a
*.la
*.slo
*.lo
*.out
# go unit test
*.test
# Autogenerated project files
compile_flags.txt
*.mk
*.project
*cmake
Makefile
*.vcxproj
*.xcodeproj
# IDE trash
.vscode
.vs
/*.gcno
.intellij
.clion
*.vcxproj.filters
*.vcxproj.user
*.tlog
# OSX
.DS_Store
.AppleDouble
.LSOverride
xcuserdata/
# Win32
Thumbs.db
Thumbs.db:encryptable
ehthumbs.db
ehthumbs_vista.db
*.lnk
# Linux is trash and cant hotswap like NT
.nfs*
.fuse_hidden*
# Ninja
.ninja_deps
.ninja_log
# PID locks
*.pid
*.pid.lock
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# JetBrains
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# AWS User-specific
.idea/**/aws.xml
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
# Gradle
.idea/**/gradle.xml
.idea/**/libraries
# CMake
cmake-build-*/
# Android Studio
.idea/caches/build_file_checksums.ser
# why would we ever ship this dir?
.idea/caches/*
# NodeJS
npm-debug.log*
yarn-debug.log*
yarn-error.log*
lerna-debug.log*
.pnpm-debug.log*
# node-waf configuration
.lock-wscript
# Dependency directories
node_modules/
jspm_packages/
# Snowpack dependency directory (https://snowpack.dev/)
web_modules/
# TypeScript cache
*.tsbuildinfo
# Optional npm cache directory
.npm
# Optional eslint cache
.eslintcache
# yarn v2
.yarn/cache
.yarn/unplugged
.yarn/build-state.yml
.yarn/install-state.gz
.pnp.*
# VS Code Extensions
.vscode-test
# Qt unit tests
target_wrapper.*
# QtCreator
*.autosave
# QtCreator Qml
*.qmlproject.user
*.qmlproject.user.*
# QtCreator CMake
CMakeLists.txt.user*
# QtCreator 4.8< compilation database
compile_commands.json
# QtCreator local machine specific files for imported projects
*creator.user*
*_qmlcache.qrc
# QT cache and user files
/.qmake.cache
/.qmake.stash
*.pro.user
*.pro.user.*
*.qbs.user
*.qbs.user.*
*.moc
# Java trash
hs_err_pid*
.gradle
gradle-app.setting
!gradle-wrapper.jar
.gradletasknamecache
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
pom.xml.next
release.properties
dependency-reduced-pom.xml
buildNumber.properties
.mvn/timing.properties
.mvn/wrapper/maven-wrapper.jar

7
Aurora.json Normal file
View File

@ -0,0 +1,7 @@
{
"type": "aurora",
"projectType": "StaticLib",
"name": "AuroraRPC",
"include-depends": ["AuroraRuntime"],
"depends": ["AuroraRuntime"]
}

138
Include/AuRPCAPI.hpp Normal file
View File

@ -0,0 +1,138 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuRPCAPI.hpp
Date: 2022-6-29
Author: Reece
***/
#pragma once
struct AuRPCRequest;
AUE_DEFINE(ERPCRequestState, (
ePending,
eSubmitting,
eSent,
eResponse,
eFailed
));
AUE_DEFINE(ERPCError, (
eNone,
eMissingService,
eMissingMethod,
eProtocolError,
eIOError,
eChannelTerminated,
eChannelFailure,
eAborted
));
struct AuRPCResponse
{
virtual ~AuRPCResponse()
{
}
AuRPCResponse() : error(ERPCError::eNone)
{
}
AuRPCResponse(ERPCError error) : error(error)
{
}
ERPCError error;
AuUInt64 cookie {};
AuByteBuffer *message {};
void Deserialize()
{
this->error = AuStaticCast<ERPCError>(message->Read<AuUInt8>());
this->cookie = message->Read<AuUInt64>();
}
void PrepareMessage()
{
message->Write<AuUInt8>(0);
message->Write<AuUInt64>(this->cookie);
}
void WriteError()
{
message->Write<AuUInt8>(AuStaticCast<AuUInt8>(this->error));
message->Write<AuUInt64>(this->cookie);
}
};
struct AuRPCResponseOwned : AuRPCResponse
{
AuSPtr<AuByteBuffer> buffer;
void PrepareResponse(AuUInt8 type)
{
buffer = AuMakeShared<AuByteBuffer>();
SysAssert(buffer);
this->message = buffer.get();
buffer->Write<AuUInt32>(0);
buffer->Write(type);
}
void FinalizeWrite()
{
auto length = buffer->writePtr - buffer->base;
buffer->writePtr = buffer->base;
buffer->Write<AuUInt32>(length);
buffer->writePtr += (length - 4);
}
};
AUI_INTERFACE(AuRPCRequestCallback,
AUI_METHOD(void, OnResponse, (const AuRPCResponse &, response))
);
AUI_INTERFACE(AuIRPCChannelCallbacks,
AUI_METHOD(void, OnConnect, ()),
AUI_METHOD(void, OnDisconnect, ())
);
struct AuIRPCRequest
{
virtual bool SetData(const AuByteBuffer &toRead) = 0;
virtual bool SetData(const AuMemoryViewRead &view) = 0;
virtual void SetCallback(AuSPtr<AuRPCRequestCallback> callback) = 0;
};
struct AuIRPCClientChannel
{
virtual void Disconnect() = 0;
virtual void SendRequest(AuSPtr<AuIRPCRequest> response) = 0;
virtual bool IsConnected() = 0;
virtual void SetCallbacks(const AuSPtr<AuIRPCChannelCallbacks> &callbacks) = 0;
};
struct AuIRPCService
{
virtual AuUInt32 GetId() = 0;
virtual void Dispatch(AuRPCResponse &response, AuUInt32 id, AuByteBuffer& buffer) = 0;
};
struct AuIRPCServer
{
virtual bool RegisterService(const AuSPtr<AuIRPCService> service) = 0;
virtual AuString ExportString() = 0;
};
struct AuIRPC
{
virtual bool StartClient(AuAsync::WorkerPId_t worker) = 0;
virtual bool StartServer(AuAsync::WorkerPId_t worker) = 0;
virtual AuSPtr<AuIRPCServer> ToServer() = 0;
virtual AuSPtr<AuIRPCClientChannel> Connect(const AuString& str) = 0;
};
AuSPtr<AuIRPCRequest> AuRPCNewRequest(AuUInt32 serviceId, AuUInt32 methodId);
AuSPtr<AuIRPC> AuRPCNewInstance();

8
LICENSE Normal file
View File

@ -0,0 +1,8 @@
Copyright 2022 J Reece Wilson
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

68
README.md Normal file
View File

@ -0,0 +1,68 @@
# Experimental RPC library
## Example
```cpp
#include <AuRPCAPI.hpp>
static AuSPtr<AuIRPC> gRpcServer = AuRPCNewInstance();
static AuSPtr<AuIRPC> gRpcClient = AuRPCNewInstance();
static AuSPtr<AuIRPCClientChannel> gRpcClientChannel;
void Run()
{
SysAssert(gRpcServer->StartServer(AuAsync::GetCurrentWorkerPId()));
SysAssert(gRpcClient->StartClient(AuAsync::GetCurrentWorkerPId()));
auto server = gRpcServer->ToServer();
auto handle = server->ExportString();
struct DummyService : AuIRPCService
{
AuUInt32 GetId() override
{
return 2;
}
void Dispatch(AuRPCResponse &response, AuUInt32 id, AuByteBuffer &buffer) override
{
AuLogDbg("ID = {}, bytes = {}", id, buffer.RemainingBytes());
response.message->Write<AuString>("Hello World");
}
};
server->RegisterService(AuMakeShared<DummyService>());
gRpcClientChannel = gRpcClient->Connect(handle);
SysAssert(gRpcClientChannel);
auto callback = AuMakeShared<AuIRPCChannelCallbacksFunctional>();
SysAssert(callback);
callback->OnConnectFunctional = []()
{
auto request = AuRPCNewRequest(2, 3);
SysAssert(request);
auto callback = AuMakeShared<AuRPCRequestCallbackFunctional>();
SysAssert(callback);
callback->OnResponseFunctional = [](const AuRPCResponse &res)
{
AuLogDbg("Complete! Message = {}", res.message->Read<AuString>());
};
request->SetCallback(callback);
gRpcClientChannel->SendRequest(request);
};
gRpcClientChannel->SetCallbacks(callback);
}
```
__Linux IO is lagging behind__

66
Source/AuRPC.cpp Normal file
View File

@ -0,0 +1,66 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuRPC.cpp
Date: 2022-6-29
Author: Reece
***/
#include <AuroraRuntime.hpp>
#include "AuRPC.hpp"
#include "AuRPCClientChannel.hpp"
static thread_local AuSPtr<AuIO::IIOProcessor> tlsIOProcessor;
AuSPtr<AuIO::IIOProcessor> GetRPCProcessor()
{
return tlsIOProcessor ?
tlsIOProcessor :
tlsIOProcessor = AuIO::NewIOProcessorOnThread(false, AuAsync::GetCurrentWorkerPId());
}
bool AuRPC::StartClient(AuAsync::WorkerPId_t worker)
{
if (worker.second == AuAsync::kThreadIdAny)
{
return false;
}
this->pinnedClientThread = worker;
return true;
}
bool AuRPC::StartServer(AuAsync::WorkerPId_t worker)
{
return this->server.Init(this, worker);
}
AuSPtr<AuIRPCServer> AuRPC::ToServer()
{
return AuSPtr<AuRPCServer>(AuSharedFromThis(), &this->server);
}
AuSPtr<AuIRPCClientChannel> AuRPC::Connect(const AuString& str)
{
auto eh = AuMakeShared<AuRPCClientChannel>(this->SharedFromThis());
if (!eh)
{
return {};
}
//if (!AuTryInsert(this->clientChannels, eh))
//{
// return {};
//}
if (!eh->Init(str))
{
return {};
}
return eh;
}
AuSPtr<AuIRPC> AuRPCNewInstance()
{
return AuMakeShared<AuRPC>();
}

59
Source/AuRPC.hpp Normal file
View File

@ -0,0 +1,59 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuRPC.hpp
Date: 2022-6-29
Author: Reece
***/
#pragma once
struct Session;
struct AuRPCServer;
struct AuRPCServerChannel;
struct AuRPCClientChannel;
struct AuRPCRequest;
struct AuRPC;
struct AuRPCRequestCallback;
struct AuIRPCService;
#include "AuRPCAPI.hpp"
#include <Aurora/IO/IOExperimental.hpp>
#include "AuRPCServer.hpp"
struct AuRPC : AuIRPC, AuEnableSharedFromThis<AuRPC>
{
bool StartClient(AuAsync::WorkerPId_t worker);
bool StartServer(AuAsync::WorkerPId_t worker);
AuSPtr<AuIRPCServer> ToServer();
AuSPtr<AuIRPCClientChannel> Connect(const AuString& str);
private:
AuAsync::WorkerPId_t pinnedClientThread;
AuRPCServer server;
AuList<AuSPtr<AuRPCClientChannel>> clientChannels;
};
AuSPtr<AuIO::IIOProcessor> GetRPCProcessor();
static const auto kRequestConnect = 1;
static const auto kRequestRPC = 2;
static const auto kResponseConnectOK = 10;
static const auto kResponseMulticonnect = 11;
static const auto kResponseRPC = 12;
static const AuUInt8 kResponseError[512] = {0};
#if (defined(AU_CFG_ID_DEBUG) || defined(AU_CFG_ID_INTERNAL__) /*!!!*/) && !defined(AU_RPC_DEBUG)
#define AU_RPC_DEBUG
#endif
#if defined(AU_RPC_DEBUG)
#define RpcLogDebug AuLogDbg
#else
#define RpcLogDebug(...)
#endif

16
Source/AuRPCChannel.hpp Normal file
View File

@ -0,0 +1,16 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuRPCChannel.hpp
Date: 2022-6-29
Author: Reece
***/
#pragma once
struct AuRPCChannel
{
virtual AuSPtr<AuRPC> ToContext() = 0;
virtual bool OnConnect() = 0;
virtual void OnDisconnect(bool error) = 0;
virtual bool OnDataAvailable(AuByteBuffer& view) = 0;
};

View File

@ -0,0 +1,198 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuRPCClientChannel.cpp
Date: 2022-6-29
Author: Reece
***/
#include <AuroraRuntime.hpp>
#include "AuRPC.hpp"
#include "AuRPCClientChannel.hpp"
#include "AuRPCRequest.hpp"
#include "AuRPCPipePacket.hpp"
bool AuRPCClientChannel::OnConnect()
{
auto request = AuMakeShared<AuRPCRequest>();
request->WriteHeaderConnect();
AuRPCPipePacket packet;
packet.clientRequest = request;
packet.protPacket = true;
packet.clientChannel = SharedFromThis();
this->pipe_.SendPacket(packet);
return true;
}
void AuRPCClientChannel::Disconnect()
{
this->pipe_.Deinit();
Finalize();
}
void AuRPCClientChannel::SendRequest(AuSPtr<AuIRPCRequest> response2)
{
auto response = AuStaticCast<AuRPCRequest>(response2);
if (!response)
{
SysPushErrorArg();
return;
}
RpcLogDebug("Client sending request of bytes: {}", response->GetData().length);
AuRPCPipePacket packet;
packet.clientRequest = response;
packet.clientChannel = SharedFromThis();
this->pipe_.SendPacket(packet);
}
AuSPtr<AuRPC> AuRPCClientChannel::ToContext()
{
return this->parent_;
}
void AuRPCClientChannel::OnDisconnect(bool error)
{
RpcLogDebug("AuRPCClientChannel::OnDisconnect");
}
bool AuRPCClientChannel::OnDataAvailable(AuByteBuffer &view)
{
RpcLogDebug("AuRPCClientChannel::OnDataAvailable");
do
{
auto bytesAvailable = view.RemainingBytes();
if (bytesAvailable < 4)
{
return true;
}
auto oldRead = view.readPtr;
if (view.Read<AuUInt32>() > bytesAvailable)
{
view.readPtr = oldRead;
return true;
}
auto packetType = view.Read<AuUInt8>();
if (packetType == kResponseConnectOK)
{
this->bConnected_ = true;
this->ProcessConnectionOK();
}
else if (packetType == kResponseMulticonnect)
{
this->pipe_.Deinit();
this->bConnectingAlternate_ = true;
this->Init(view.Read<AuString>());
}
else if (packetType == kResponseRPC)
{
auto response = AuMakeShared<AuRPCResponse>();
if (!response)
{
SysPushErrorMem();
view.readPtr = oldRead;
return true;
}
response->message = &view;
response->Deserialize();
this->ProcessResponse(response);
}
} while (true);
return true;
}
bool AuRPCClientChannel::Init(const AuString &ipc)
{
return this->pipe_.Init(ipc);
}
void AuRPCClientChannel::FatalIOError()
{
RpcLogDebug("AuRPCClientChannel::FatalIOError");
Disconnect();
}
void AuRPCClientChannel::ProcessConnectionOK()
{
auto re = AuExchange(this->outstandingRequests, AuList<AuSPtr<AuRPCRequest>>{});
if (this->callbacks_)
{
this->callbacks_->OnConnect();
}
for (auto a : re)
{
this->SendRequest(a);
}
}
void AuRPCClientChannel::ProcessResponse(const AuSPtr<AuRPCResponse> &response)
{
RpcLogDebug("AuRPCClientChannel::ProcessResponse");
for (auto itr = this->outstandingRequests.begin();
itr != this->outstandingRequests.end();
)
{
if (response->cookie != AuUInt(itr->get()))
{
itr++;
continue;
}
auto req = *itr;
this->outstandingRequests.erase(itr);
if (req->callback)
{
req->callback->OnResponse(*response);
}
}
}
bool AuRPCClientChannel::IsConnected()
{
return this->bConnected_;
}
void AuRPCClientChannel::SetCallbacks(const AuSPtr<AuIRPCChannelCallbacks> &callbacks)
{
if (!callbacks)
{
SysPushErrorArg();
return;
}
if (IsConnected())
{
callbacks->OnConnect();
}
this->callbacks_ = callbacks;
}
void AuRPCClientChannel::Finalize()
{
auto re = AuExchange(this->outstandingRequests, AuList<AuSPtr<AuRPCRequest>>{});
for (auto a : re)
{
a->callback->OnResponse(AuRPCResponse(ERPCError::eAborted));
}
if (this->callbacks_)
{
this->callbacks_->OnDisconnect();
}
}

View File

@ -0,0 +1,47 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuRPCClientChannel.hpp
Date: 2022-6-29
Author: Reece
***/
#pragma once
#include "AuRPCChannel.hpp"
#include "AuRPCPipe.hpp"
struct AuRPCClientChannel : AuRPCChannel, AuIRPCClientChannel, AuEnableSharedFromThis<AuRPCClientChannel>
{
AuRPCClientChannel(AuSPtr<AuRPC> parent) : parent_(parent), pipe_(this)
{
}
bool Init(const AuString& ipc);
void Finalize();
void Disconnect() override;
void SendRequest(AuSPtr<AuIRPCRequest> response) override;
void FatalIOError();
virtual AuSPtr<AuRPC> ToContext() override;
virtual bool OnConnect() override;
virtual void OnDisconnect(bool error) override;
virtual bool OnDataAvailable(AuByteBuffer& view) override;
void ProcessConnectionOK();
void ProcessResponse(const AuSPtr<AuRPCResponse>& response);
AuList<AuSPtr<AuRPCRequest>> outstandingRequests;
virtual bool IsConnected() override;
virtual void SetCallbacks(const AuSPtr<AuIRPCChannelCallbacks> &callbacks) override;
private:
bool bConnectingAlternate_{};
bool bConnected_{};
AuSPtr<AuIRPCChannelCallbacks> callbacks_;
AuRPCPipe pipe_;
AuSPtr<AuRPC> parent_;
};

268
Source/AuRPCPipe.cpp Normal file
View File

@ -0,0 +1,268 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuRPCPipe.cpp
Date: 2022-6-29
Author: Reece
***/
#include <AuroraRuntime.hpp>
#include "AuRPC.hpp"
#include "AuRPCPipe.hpp"
#include "AuRPCPipePacket.hpp"
#include "AuRPCRequest.hpp"
#include "AuRPCChannel.hpp"
#include "AuRPCClientChannel.hpp"
bool AuRPCPipe::SendPacket(const AuRPCPipePacket &packet)
{
struct RPCTransaction : AuIO::IAsyncFinishedSubscriber, AuEnableSharedFromThis<RPCTransaction>
{
AuSPtr<AuIO::IAsyncTransaction> transaction;
AuMemoryViewRead view;
AuRPCPipePacket packet;
void OnAsyncFileOpFinished(AuUInt64 offset, AuUInt32 length) override
{
transaction->SetCallback({});
if (length != view.length)
{
this->Fail();
if (this->packet.clientChannel)
{
this->packet.clientChannel->FatalIOError();
}
}
else
{
this->Sent();
}
}
bool SendClient()
{
if (packet.protPacket)
{
return true;
}
if (!packet.clientRequest)
{
return false;
}
packet.clientRequest->state = ERPCRequestState::eSubmitting;
if (!AuTryInsert(packet.clientChannel->outstandingRequests, packet.clientRequest))
{
return false;
}
return true;
}
void Fail()
{
if (packet.clientChannel)
{
packet.clientRequest->callback->OnResponse(AuRPCResponse(ERPCError::eIOError));
packet.clientRequest->state = ERPCRequestState::eFailed;
}
this->Remove();
}
void Remove()
{
if (packet.clientChannel)
{
AuTryRemove(packet.clientChannel->outstandingRequests, packet.clientRequest);
}
}
void Sent()
{
if (packet.clientRequest)
{
packet.clientRequest->state = ERPCRequestState::eSent;
}
}
};
auto ioTransaction = this->pipe->NewAsyncTransaction();
if (!ioTransaction)
{
return false;
}
auto transactionObject = AuMakeShared<RPCTransaction>();
if (!transactionObject)
{
return false;
}
transactionObject->packet = packet;
transactionObject->view = packet.clientRequest ? packet.clientRequest->GetData() : *packet.serverResponse->message;
transactionObject->transaction = ioTransaction;
transactionObject->transaction->SetCallback(transactionObject);
transactionObject->SendClient();
if (!transactionObject->transaction->StartWrite(0, AuSPtr<AuMemoryViewRead>(transactionObject->SharedFromThis(), &transactionObject->view)))
{
transactionObject->Fail();
}
return true;
}
AuRPCPipe::~AuRPCPipe()
{
Deinit();
}
AuRPCPipe::AuRPCPipe(AuRPCChannel* channel) : channel(channel)
{
}
// IIOEventListenerFunctional
void AuRPCPipe::OnIOTick()
{
this->OnConnect();
}
void AuRPCPipe::OnIOFailure()
{
SysPushErrorIO("IO Connect Failure");
OnFatalError();
}
void AuRPCPipe::OnIOComplete()
{
//looking for who the fuck asked
//AuLogDbg("Connection watch operation completed successfully");
}
// Pipe callbacks
void AuRPCPipe::OnPipePartialEvent(AuUInt trasnferred)
{
RpcLogDebug("Consumed bytes");
}
void AuRPCPipe::OnPipeSuccessEvent()
{
RpcLogDebug("Waiting for next socket");
this->isOpenWork = GetRPCProcessor()->StartSimpleLSWatch(this->pipe->AsReadChannelIsOpen(),
AuSPtr<AuIO::IIOSimpleEventListener>(this->channel->ToContext(), this));
}
void AuRPCPipe::OnPipeFailureEvent()
{
RpcLogDebug("Couldn't listen...");
this->OnFatalError();
}
// IIOBufferedStreamAvailable
bool AuRPCPipe::OnDataAvailable(AuByteBuffer& view)
{
return this->channel->OnDataAvailable(view);
}
void AuRPCPipe::OnFatalError()
{
if (AuExchange(this->errored_, true))
{
return;
}
OnError(true);
AuLogWarn("Disable pipe here");
}
void AuRPCPipe::OnError(bool fatal)
{
this->channel->OnDisconnect(true);
}
void AuRPCPipe::OnConnect()
{
RpcLogDebug("RPC Pipe connection initiated. Beginning read...");
AuIO::IOPipeRequestAIO req;
req.asyncTransaction = this->pipe->NewAsyncTransaction();
req.isStream = true;
req.output.type = AuIO::EPipeCallbackType::eTryHandleBufferedPart;
req.output.handleBufferedStream.onData = AuSPtr<AuIO::IIOBufferedStreamAvailable>(this->channel->ToContext(), this);
req.listener = AuSPtr<AuIO::IIOPipeEventListener>(this->channel->ToContext(), this);
// Create a pipe to process an asynchronous repeating stream read transaction of no particular read/EoS limit
this->work = GetRPCProcessor()->ToPipeProcessor()->NewAIOPipe(req);
if (!this->work)
{
SysPushErrorIO("Couldn't create an IO pipe for a new session connection");
this->OnFatalError();
return;
}
// Fire ye pipe consumption
if (!this->work->Start())
{
SysPushErrorIO("Couldn't start reading the IO pipe");
this->OnFatalError();
return;
}
if (!this->channel->OnConnect())
{
SysPushErrorIO("Couldn't start the channel");
this->OnFatalError();
return;
}
}
bool AuRPCPipe::Init(const AuString& str)
{
this->pipe = AuIPC::ImportPipe(str);
if (!this->pipe)
{
SysPushErrorIO("Couldn't open an IPC pipe given handle: {}", str);
return {};
}
this->OnConnect();
this->isClient_ = true;
return true;// WaitForOtherEnd();
}
bool AuRPCPipe::Init()
{
this->pipe = AuIPC::NewPipe();
if (!this->pipe)
{
SysPushErrorIO("Couldn't spare the resources required for an IPC pipe");
return {};
}
return WaitForOtherEnd();
}
bool AuRPCPipe::WaitForOtherEnd()
{
this->isOpenWork = GetRPCProcessor()->StartSimpleLSWatchEx(this->pipe->AsReadChannelIsOpen(),
AuSPtr<AuIO::IIOSimpleEventListener>(this->channel->ToContext(), this),
true);
return bool(this->isOpenWork);
}
void AuRPCPipe::Deinit()
{
if (this->work)
{
work->End();
work.reset();
}
}

49
Source/AuRPCPipe.hpp Normal file
View File

@ -0,0 +1,49 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuRPCPipe.hpp
Date: 2022-6-29
Author: Reece
***/
#pragma once
struct AuRPCPipePacket;
struct AuRPCChannel;
struct AuRPCPipe : AuIO::IIOBufferedStreamAvailable, AuIO::IIOSimpleEventListener, AuIO::IIOPipeEventListener
{
AuRPCPipe(AuRPCChannel* channel);
~AuRPCPipe();
AuRPCChannel* channel;
AuSPtr<AuIPC::IPCPipe> pipe;
AuSPtr<AuIO::IIOPipeWork> work;
AuSPtr<AuIO::IIOProcessorItem> isOpenWork;
bool isClient_{};
// IIOEventListenerFunctional
void OnIOTick() override;
void OnIOFailure() override;
void OnIOComplete() override;
// Pipe callbacks
void OnPipePartialEvent(AuUInt trasnferred) override;
void OnPipeSuccessEvent() override;
void OnPipeFailureEvent() override;
// IIOBufferedStreamAvailable
bool OnDataAvailable(AuByteBuffer& view) override;
void OnFatalError();
void OnError(bool fatal);
void OnConnect();
bool SendPacket(const AuRPCPipePacket& packet);
bool Init(const AuString& str);
bool Init();
bool WaitForOtherEnd();
void Deinit();
private:
bool errored_{};
};

View File

@ -0,0 +1,16 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuRPCPipePacket.hpp
Date: 2022-6-29
Author: Reece
***/
#pragma once
struct AuRPCPipePacket
{
AuSPtr<AuRPCClientChannel> clientChannel;
AuSPtr<AuRPCRequest> clientRequest;
bool protPacket {};
AuSPtr<AuRPCResponse> serverResponse;
};

87
Source/AuRPCRequest.cpp Normal file
View File

@ -0,0 +1,87 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuRPCRequest.cpp
Date: 2022-6-29
Author: Reece
***/
#include <AuroraRuntime.hpp>
#include "AuRPC.hpp"
#include "AuRPCRequest.hpp"
bool AuRPCRequest::SetData(const AuByteBuffer& toRead)
{
return this->SetData(AuMemoryViewRead(toRead));
}
bool AuRPCRequest::SetData(const AuMemoryViewRead& view)
{
data.Reset();
this->packetLength = view.length + this->HeaderLength();
data.Reserve(view.length + this->HeaderLength());
if (!this->data)
{
SysPushErrorMem();
return false;
}
this->WriteRPCHeader();
this->data.Write(view.ptr, view.length);
return bool(data);
}
void AuRPCRequest::SetCallback(AuSPtr<AuRPCRequestCallback> callback)
{
this->callback = callback;
}
AuMemoryViewRead AuRPCRequest::GetData()
{
return this->data;
}
void AuRPCRequest::WriteDummy()
{
WriteRPCHeader();
}
void AuRPCRequest::WriteHeaderConnect()
{
this->data.Write<AuUInt32>(5);
this->data.Write<AuUInt8>(kRequestConnect);
}
void AuRPCRequest::WriteRPCHeader()
{
this->data.Write<AuUInt32>(this->packetLength);
this->data.Write<AuUInt8>(kRequestRPC);
this->data.Write<AuUInt32>(this->serviceId);
this->data.Write<AuUInt32>(this->methodId);
this->data.Write<AuUInt64>(AuUInt64(AuUInt(this)));
}
AuUInt32 AuRPCRequest::HeaderLength()
{
return 4 + 1 + 4 /*service id*/ + 4 /*method*/ + 8 /*uid*/;
}
ERPCRequestState AuRPCRequest::GetState()
{
return this->state;
}
AuSPtr<AuIRPCRequest> AuRPCNewRequest(AuUInt32 serviceId, AuUInt32 methodId)
{
auto req = AuMakeShared<AuRPCRequest>();
if (!req)
{
return {};
}
req->serviceId = serviceId;
req->methodId = methodId;
req->WriteRPCHeader();
return req;
}

36
Source/AuRPCRequest.hpp Normal file
View File

@ -0,0 +1,36 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuRPCRequest.hpp
Date: 2022-6-29
Author: Reece
***/
#pragma once
struct AuRPCRequest : AuIRPCRequest
{
AuUInt32 serviceId{};
ERPCRequestState state{};
AuUInt32 methodId{};
AuSPtr<AuRPCRequestCallback> callback;
AuUInt64 cookie {};
bool SetData(const AuByteBuffer& toRead) override;
bool SetData(const AuMemoryViewRead& view) override;
void SetCallback(AuSPtr<AuRPCRequestCallback> callback) override;
ERPCRequestState GetState();
AuMemoryViewRead GetData();
void WriteDummy();
void WriteHeaderConnect();
void WriteRPCHeader();
AuUInt32 HeaderLength();
private:
AuUInt32 packetLength{};
AuByteBuffer data;
};

133
Source/AuRPCServer.cpp Normal file
View File

@ -0,0 +1,133 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuRPCServer.cpp
Date: 2022-6-29
Author: Reece
***/
#include <AuroraRuntime.hpp>
#include "AuRPC.hpp"
#include "AuRPCServer.hpp"
#include "AuRPCServerChannel.hpp"
#include "AuRPCRequest.hpp"
bool AuRPCServer::Init(AuRPC *parent, AuAsync::WorkerPId_t worker)
{
if (this->channel)
{
SysPushErrorGen("Double init");
return {};
}
this->lock = AuThreadPrimitives::RWLockUnique();
if (!this->lock)
{
SysPushErrorMem();
return {};
}
this->worker = worker;
this->parent = parent;
this->channel = NewChannel();
if (!this->channel)
{
return false;
}
this->channel->MakeTemp();
return true;
}
bool AuRPCServer::RegisterService(const AuSPtr<AuIRPCService> service)
{
if (!service)
{
SysPushErrorArg();
return {};
}
AU_LOCK_GUARD(this->lock->AsWritable());
return AuTryInsert(this->serviceTable, service->GetId(), service);
}
AuString AuRPCServer::ExportString()
{
return ToPrimaryChannel()->ExportString();
}
AuSPtr<AuRPCServerChannel> AuRPCServer::ToPrimaryChannel()
{
if (this->channel)
{
return this->channel;
}
return this->channel = NewChannel();
}
void AuRPCServer::Dispatch(AuRPCServerChannel *channel,
const AuSPtr<AuRPCRequest> &request,
AuByteBuffer *buffer)
{
AU_LOCK_GUARD(this->lock->AsReadable());
auto response = AuMakeShared<AuRPCResponseOwned>();
SysAssert(response);
response->cookie = request->cookie;
response->PrepareResponse(kResponseRPC);
auto itr = this->serviceTable.find(request->serviceId);
if (itr == this->serviceTable.end())
{
response->FinalizeWrite();
channel->SendResponse(response);
return;
}
try
{
response->PrepareMessage();
itr->second->Dispatch(*response, request->methodId, *buffer);
channel->SendResponse(response);
response->FinalizeWrite();
}
catch (...)
{
SysPushErrorCatch();
channel->FatalError();
}
}
AuSPtr<AuRPCServerChannel> AuRPCServer::NewChannel()
{
auto eh = AuMakeShared<AuRPCServerChannel>(this->parent->SharedFromThis(), AuSPtr<AuRPCServer>(this->parent->SharedFromThis(), this));
if (!eh)
{
return {};
}
if ((this->worker.second != AuAsync::kThreadIdAny) &&
(this->worker == AuAsync::GetCurrentWorkerPId()))
{
if (!eh->Init())
{
return {};
}
}
else
{
AuAsync::NewWorkItem(this->worker, AuMakeShared<AuAsync::BasicWorkStdFunc>([&]()
{
if (!eh->Init())
{
eh.reset();
}
}), true)->Dispatch()->BlockUntilComplete();
}
return eh;
}

33
Source/AuRPCServer.hpp Normal file
View File

@ -0,0 +1,33 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuRPCServer.hpp
Date: 2022-6-29
Author: Reece
***/
#pragma once
struct AuRPCRequest;
struct AuRPCServerChannel;
struct AuRPCServer : AuIRPCServer
{
bool Init(AuRPC* parent, AuAsync::WorkerPId_t worker);
bool RegisterService(const AuSPtr<AuIRPCService> service) override;
AuString ExportString() override;
AuSPtr<AuRPCServerChannel> ToPrimaryChannel();
AuSPtr<AuRPCServerChannel> NewChannel();
void Dispatch(AuRPCServerChannel *channel,
const AuSPtr<AuRPCRequest> &request,
AuByteBuffer *buffer);
private:
AuRPC* parent;
AuSPtr<AuRPCServerChannel> channel;
AuAsync::WorkerPId_t worker;
AuThreadPrimitives::RWLockUnique_t lock;
AuHashMap<AuUInt32, AuSPtr<AuIRPCService>> serviceTable;
};

View File

@ -0,0 +1,169 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuRPCServerChannel.cpp
Date: 2022-6-29
Author: Reece
***/
#include <AuroraRuntime.hpp>
#include "AuRPC.hpp"
#include "AuRPCServerChannel.hpp"
#include "AuRPCRequest.hpp"
#include "AuRPCPipePacket.hpp"
AuRPCServerChannel::AuRPCServerChannel(AuSPtr<AuRPC> parent, AuSPtr<AuRPCServer> server) :
parent(parent),
pipe(this),
server_(server)
{
}
AuSPtr<AuRPC> AuRPCServerChannel::ToContext()
{
return this->parent;
}
void AuRPCServerChannel::SendResponse(AuSPtr<AuRPCResponse> response)
{
if (response->message->flagWriteError)
{
this->FatalError();
return;
}
AuRPCPipePacket packet;
packet.serverResponse = response;
this->pipe.SendPacket(packet);
}
AuString AuRPCServerChannel::ExportString()
{
return this->pipe.pipe->ExportToString();
}
bool AuRPCServerChannel::OnConnect()
{
RpcLogDebug("Server channel: on connect");
return true;
}
void AuRPCServerChannel::OnDisconnect(bool error)
{
RpcLogDebug("Server channel disconnected: {}", error);
}
bool AuRPCServerChannel::OnDataAvailable(AuByteBuffer& view)
{
do
{
auto bytesAvailable = view.RemainingBytes();
if (bytesAvailable < 4)
{
return true;
}
auto oldRead = view.readPtr;
if (view.Read<AuUInt32>() > bytesAvailable)
{
view.readPtr = oldRead;
return true;
}
auto packetType = view.Read<AuUInt8>();
if (packetType == kRequestConnect)
{
if (this->isTempChannel_)
{
this->SendToNewChannel();
}
else
{
this->SendConnectOK();
}
}
else if (packetType == kRequestRPC)
{
auto request = AuMakeShared<AuRPCRequest>();
if (!request)
{
SysPushErrorMem();
view.readPtr = oldRead;
return true;
}
request->serviceId = view.Read<AuUInt32>();
request->methodId = view.Read<AuUInt32>();
request->cookie = view.Read<AuUInt64>();
this->server_->Dispatch(this, request, &view);
}
}
while (true);
return true;
}
void AuRPCServerChannel::SendConnectOK()
{
RpcLogDebug("AuRPCServerChannel::SendConnectOK");
auto res = AuMakeShared<AuRPCResponseOwned>();
if (!res)
{
FatalError();
return;
}
res->PrepareResponse(kResponseConnectOK);
res->FinalizeWrite();
this->SendResponse(res);
}
void AuRPCServerChannel::SendToNewChannel()
{
RpcLogDebug("AuRPCServerChannel::SendToNewChannel");
auto res = AuMakeShared<AuRPCResponseOwned>();
if (!res)
{
FatalError();
return;
}
res->PrepareResponse(kResponseMulticonnect);
auto channel = this->server_->NewChannel();
if (!channel)
{
this->FatalError();
return;
}
if (!AuTryInsert(this->subchannels_, channel))
{
this->FatalError();
return;
}
res->buffer->Write(channel->ExportString());
res->FinalizeWrite();
this->SendResponse(res);
}
void AuRPCServerChannel::FatalError()
{
RpcLogDebug("AuRPCServerChannel::FatalError");
this->pipe.Deinit();
}
bool AuRPCServerChannel::Init()
{
return this->pipe.Init();
}

View File

@ -0,0 +1,43 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuRPCServerChannel.hpp
Date: 2022-6-29
Author: Reece
***/
#pragma once
#include "AuRPCChannel.hpp"
#include "AuRPCPipe.hpp"
struct AuRPCServerChannel : AuRPCChannel
{
AuRPCServerChannel(AuSPtr<AuRPC> parent, AuSPtr<AuRPCServer> server);
bool Init();
AuString ExportString();
void SendResponse(AuSPtr<AuRPCResponse> response);
virtual AuSPtr<AuRPC> ToContext() override;
virtual bool OnConnect() override;
virtual void OnDisconnect(bool error) override;
virtual bool OnDataAvailable(AuByteBuffer& view) override;
void FatalError();
void SendToNewChannel();
void SendConnectOK();
inline void MakeTemp()
{
this->isTempChannel_ = true;
}
private:
AuList<AuSPtr<AuRPCServerChannel>> subchannels_;
AuSPtr<AuRPCServer> server_;
bool isTempChannel_ {};
AuRPCPipe pipe;
AuSPtr<AuRPC> parent;
};