diff --git a/Include/AuRPCAPI.hpp b/Include/AuRPCAPI.hpp index f4196d3..3cd69ad 100644 --- a/Include/AuRPCAPI.hpp +++ b/Include/AuRPCAPI.hpp @@ -140,7 +140,7 @@ struct AuIRPCRequest struct AuIRPCSession { virtual AuUInt64 GetConnectTimeNS() = 0; - virtual void SendMessage(const AuMemoryViewRead &view) = 0; + virtual bool SendMessage(const AuMemoryViewRead &view, bool bLarge) = 0; }; struct AuIRPCClientChannel : diff --git a/Source/AuRPC.cpp b/Source/AuRPC.cpp index 061d88b..02c0468 100644 --- a/Source/AuRPC.cpp +++ b/Source/AuRPC.cpp @@ -61,6 +61,12 @@ void AuRPC::SetRecommendedPipeLength(AuUInt32 uLength) this->optPipeLength = uLength; } +AuUInt32 AuRPC::GetLargePacketLength() +{ + auto uLength = this->optPipeLength.value_or(AuHwInfo::GetPageSize() * 16); + return uLength / 3; +} + AuSPtr AuRPCNewInstance() { return AuMakeShared(); diff --git a/Source/AuRPC.hpp b/Source/AuRPC.hpp index 4123891..86c80dd 100644 --- a/Source/AuRPC.hpp +++ b/Source/AuRPC.hpp @@ -30,6 +30,8 @@ struct AuRPC : AuIRPC, AuEnableSharedFromThis AuSPtr Connect(const AuString &str); void SetRecommendedPipeLength(AuUInt32 uLength) override; + AuUInt32 GetLargePacketLength(); + private: friend struct AuRPCPipe; friend struct AuRPCServerChannel; @@ -49,9 +51,10 @@ static const auto kResponseConnectOK = 10u; static const auto kResponseMulticonnect = 11u; static const auto kResponseRPC = 12u; -static const auto kGeneralBroadcast = 20u; -static const auto kGeneralClientMessage = 21u; // shared -static const auto kGeneralServerMessage = 21u; // shared +static const auto kGeneralBroadcast = 20u; +static const auto kGeneralClientMessage = 21u; // shared +static const auto kGeneralServerMessage = 21u; // shared +static const auto kGeneralMassiveMessage = 22u; static const AuUInt8 kResponseError[512] = {0}; diff --git a/Source/AuRPCClientChannel.cpp b/Source/AuRPCClientChannel.cpp index fa7ff1a..8cf04fd 100644 --- a/Source/AuRPCClientChannel.cpp +++ b/Source/AuRPCClientChannel.cpp @@ -56,12 +56,51 @@ AuUInt64 AuRPCClientChannel::GetConnectTimeNS() return this->uConnectTime_; } -void AuRPCClientChannel::SendMessage(const AuMemoryViewRead &view) +bool AuRPCClientChannel::SendMessage(const AuMemoryViewRead &view, bool bLarge) { - auto pMessage = AuMakeSharedPanic(); - pMessage->dataType = kGeneralServerMessage; - pMessage->SetData(view); - this->SendRequest(pMessage); + if (!bLarge && + view.length < this->parent_->GetLargePacketLength()) + { + auto pMessage = AuMakeShared(); + if (!pMessage) + { + return false; + } + + pMessage->dataType = kGeneralServerMessage; + pMessage->SetData(view); + this->SendRequest(pMessage); + return true; + } + else + { + auto pSharedMemory = AuIPC::NewSharedMemory(view.length); + if (!pSharedMemory) + { + return false; + } + + auto dest = pSharedMemory->GetMemory(); + AuMemcpy(dest.ptr, view.ptr, view.length); + + auto pMessage = AuMakeShared(); + if (!pMessage) + { + return false; + } + + pMessage->dataType = kGeneralMassiveMessage; + AuByteBuffer data; + data.Write(pSharedMemory->ExportToString()); + if (!data) + { + return false; + } + + pMessage->SetData(data); + this->SendRequest(pMessage); + return true; + } } AuSPtr AuRPCClientChannel::ToContext() @@ -161,7 +200,7 @@ bool AuRPCClientChannel::OnDataAvailable(AuByteBuffer &view) } else { - pCallbacks->OnBroadcast(view); + pCallbacks->OnMessage(view); } } @@ -169,8 +208,45 @@ bool AuRPCClientChannel::OnDataAvailable(AuByteBuffer &view) view.writePtr = oldWriteHead; view.length = oldLength; } + else if (packetType == kGeneralMassiveMessage) + { + auto oldLength = view.length; + auto oldWriteHead = view.writePtr; + view.length = (oldRead - view.base) + frameLength; + view.writePtr = view.base + view.length; - } while (true); + auto str = view.Read(); + if (view) + { + auto pMemory = AuIPC::ImportSharedMemory(str); + if (!pMemory) + { + SysPushErrorIO("Couldnt open shared memory for large packet"); + this->FatalIOError(); + return true; + } + + AuByteBuffer buf(pMemory->GetMemory()); + if (!buf) + { + SysPushErrorMemory(); + this->FatalIOError(); + return true; + } + + if (auto pCallbacks = this->callbacks_) + { + pCallbacks->OnMessage(buf); + } + } + + view.readPtr = oldRead + frameLength; + view.writePtr = oldWriteHead; + view.length = oldLength; + } + + } + while (true); return true; } diff --git a/Source/AuRPCClientChannel.hpp b/Source/AuRPCClientChannel.hpp index be08704..a81dfd9 100644 --- a/Source/AuRPCClientChannel.hpp +++ b/Source/AuRPCClientChannel.hpp @@ -26,7 +26,7 @@ struct AuRPCClientChannel : void FatalIOError(); virtual AuUInt64 GetConnectTimeNS() override; - virtual void SendMessage(const AuMemoryViewRead &view) override; + virtual bool SendMessage(const AuMemoryViewRead &view, bool bLarge) override; virtual AuSPtr ToContext() override; virtual bool OnConnect() override; diff --git a/Source/AuRPCServerChannel.cpp b/Source/AuRPCServerChannel.cpp index 59158ec..39ed481 100644 --- a/Source/AuRPCServerChannel.cpp +++ b/Source/AuRPCServerChannel.cpp @@ -165,6 +165,43 @@ bool AuRPCServerChannel::OnDataAvailable(AuByteBuffer& view) view.writePtr = oldWriteHead; view.length = oldLength; } + else if (packetType == kGeneralMassiveMessage) + { + auto oldLength = view.length; + auto oldWriteHead = view.writePtr; + view.length = (oldRead - view.base) + frameLength; + view.writePtr = view.base + view.length; + + auto str = view.Read(); + if (view) + { + auto pMemory = AuIPC::ImportSharedMemory(str); + if (!pMemory) + { + SysPushErrorIO("Couldnt open shared memory for large packet"); + this->FatalError(); + return true; + } + + AuByteBuffer buf(pMemory->GetMemory()); + if (!buf) + { + SysPushErrorMemory(); + this->FatalError(); + return true; + } + + if (auto pCallbacks = this->server_->pCallbacks) + { + pCallbacks->OnMessage(this->SharedFromThis(), + buf); + } + } + + view.readPtr = oldRead + frameLength; + view.writePtr = oldWriteHead; + view.length = oldLength; + } } while (true); @@ -177,13 +214,58 @@ AuUInt64 AuRPCServerChannel::GetConnectTimeNS() return this->uConnectTime_; } -void AuRPCServerChannel::SendMessage(const AuMemoryViewRead &view) +bool AuRPCServerChannel::SendMessage(const AuMemoryViewRead &view, bool bLarge) { - auto pMessage = AuMakeSharedPanic(); - pMessage->PrepareResponse(kGeneralClientMessage); - pMessage->buffer->Write(view.ptr, view.length); - pMessage->FinalizeWrite(); - this->SendResponse(pMessage); + if (!bLarge && + view.length < this->parent->GetLargePacketLength()) + { + auto pMessage = AuMakeShared(); + if (!pMessage) + { + return false; + } + + pMessage->PrepareResponse(kGeneralClientMessage); + pMessage->buffer->Write(view.ptr, view.length); + pMessage->FinalizeWrite(); + + if (pMessage->message->flagWriteError) + { + return false; + } + + this->SendResponse(pMessage); + return true; + } + else + { + auto pSharedMemory = AuIPC::NewSharedMemory(view.length); + if (!pSharedMemory) + { + return false; + } + + auto dest = pSharedMemory->GetMemory(); + AuMemcpy(dest.ptr, view.ptr, view.length); + + auto pMessage = AuMakeShared(); + if (!pMessage) + { + return false; + } + + pMessage->PrepareResponse(kGeneralMassiveMessage); + pMessage->buffer->Write(AuString(pSharedMemory->ExportToString())); + pMessage->FinalizeWrite(); + + if (pMessage->message->flagWriteError) + { + return false; + } + + this->SendResponse(pMessage); + return true; + } } void AuRPCServerChannel::SendConnectOK() diff --git a/Source/AuRPCServerChannel.hpp b/Source/AuRPCServerChannel.hpp index 13f37a8..2c1625f 100644 --- a/Source/AuRPCServerChannel.hpp +++ b/Source/AuRPCServerChannel.hpp @@ -29,7 +29,7 @@ struct AuRPCServerChannel : virtual bool OnDataAvailable(AuByteBuffer& view) override; virtual AuUInt64 GetConnectTimeNS() override; - virtual void SendMessage(const AuMemoryViewRead &view) override; + virtual bool SendMessage(const AuMemoryViewRead &view, bool bLarge) override; void FatalError();