[+] Large packets

This commit is contained in:
Reece Wilson 2023-12-16 22:15:23 +00:00
parent a37a7f1023
commit 37456a9066
7 changed files with 186 additions and 19 deletions

View File

@ -140,7 +140,7 @@ struct AuIRPCRequest
struct AuIRPCSession
{
virtual AuUInt64 GetConnectTimeNS() = 0;
virtual void SendMessage(const AuMemoryViewRead &view) = 0;
virtual bool SendMessage(const AuMemoryViewRead &view, bool bLarge) = 0;
};
struct AuIRPCClientChannel :

View File

@ -61,6 +61,12 @@ void AuRPC::SetRecommendedPipeLength(AuUInt32 uLength)
this->optPipeLength = uLength;
}
AuUInt32 AuRPC::GetLargePacketLength()
{
auto uLength = this->optPipeLength.value_or(AuHwInfo::GetPageSize() * 16);
return uLength / 3;
}
AuSPtr<AuIRPC> AuRPCNewInstance()
{
return AuMakeShared<AuRPC>();

View File

@ -30,6 +30,8 @@ struct AuRPC : AuIRPC, AuEnableSharedFromThis<AuRPC>
AuSPtr<AuIRPCClientChannel> Connect(const AuString &str);
void SetRecommendedPipeLength(AuUInt32 uLength) override;
AuUInt32 GetLargePacketLength();
private:
friend struct AuRPCPipe;
friend struct AuRPCServerChannel;
@ -49,9 +51,10 @@ static const auto kResponseConnectOK = 10u;
static const auto kResponseMulticonnect = 11u;
static const auto kResponseRPC = 12u;
static const auto kGeneralBroadcast = 20u;
static const auto kGeneralClientMessage = 21u; // shared
static const auto kGeneralServerMessage = 21u; // shared
static const auto kGeneralBroadcast = 20u;
static const auto kGeneralClientMessage = 21u; // shared
static const auto kGeneralServerMessage = 21u; // shared
static const auto kGeneralMassiveMessage = 22u;
static const AuUInt8 kResponseError[512] = {0};

View File

@ -56,12 +56,51 @@ AuUInt64 AuRPCClientChannel::GetConnectTimeNS()
return this->uConnectTime_;
}
void AuRPCClientChannel::SendMessage(const AuMemoryViewRead &view)
bool AuRPCClientChannel::SendMessage(const AuMemoryViewRead &view, bool bLarge)
{
auto pMessage = AuMakeSharedPanic<AuRPCRequest>();
pMessage->dataType = kGeneralServerMessage;
pMessage->SetData(view);
this->SendRequest(pMessage);
if (!bLarge &&
view.length < this->parent_->GetLargePacketLength())
{
auto pMessage = AuMakeShared<AuRPCRequest>();
if (!pMessage)
{
return false;
}
pMessage->dataType = kGeneralServerMessage;
pMessage->SetData(view);
this->SendRequest(pMessage);
return true;
}
else
{
auto pSharedMemory = AuIPC::NewSharedMemory(view.length);
if (!pSharedMemory)
{
return false;
}
auto dest = pSharedMemory->GetMemory();
AuMemcpy(dest.ptr, view.ptr, view.length);
auto pMessage = AuMakeShared<AuRPCRequest>();
if (!pMessage)
{
return false;
}
pMessage->dataType = kGeneralMassiveMessage;
AuByteBuffer data;
data.Write(pSharedMemory->ExportToString());
if (!data)
{
return false;
}
pMessage->SetData(data);
this->SendRequest(pMessage);
return true;
}
}
AuSPtr<AuRPC> AuRPCClientChannel::ToContext()
@ -161,7 +200,7 @@ bool AuRPCClientChannel::OnDataAvailable(AuByteBuffer &view)
}
else
{
pCallbacks->OnBroadcast(view);
pCallbacks->OnMessage(view);
}
}
@ -169,8 +208,45 @@ bool AuRPCClientChannel::OnDataAvailable(AuByteBuffer &view)
view.writePtr = oldWriteHead;
view.length = oldLength;
}
else if (packetType == kGeneralMassiveMessage)
{
auto oldLength = view.length;
auto oldWriteHead = view.writePtr;
view.length = (oldRead - view.base) + frameLength;
view.writePtr = view.base + view.length;
} while (true);
auto str = view.Read<AuString>();
if (view)
{
auto pMemory = AuIPC::ImportSharedMemory(str);
if (!pMemory)
{
SysPushErrorIO("Couldnt open shared memory for large packet");
this->FatalIOError();
return true;
}
AuByteBuffer buf(pMemory->GetMemory());
if (!buf)
{
SysPushErrorMemory();
this->FatalIOError();
return true;
}
if (auto pCallbacks = this->callbacks_)
{
pCallbacks->OnMessage(buf);
}
}
view.readPtr = oldRead + frameLength;
view.writePtr = oldWriteHead;
view.length = oldLength;
}
}
while (true);
return true;
}

View File

@ -26,7 +26,7 @@ struct AuRPCClientChannel :
void FatalIOError();
virtual AuUInt64 GetConnectTimeNS() override;
virtual void SendMessage(const AuMemoryViewRead &view) override;
virtual bool SendMessage(const AuMemoryViewRead &view, bool bLarge) override;
virtual AuSPtr<AuRPC> ToContext() override;
virtual bool OnConnect() override;

View File

@ -165,6 +165,43 @@ bool AuRPCServerChannel::OnDataAvailable(AuByteBuffer& view)
view.writePtr = oldWriteHead;
view.length = oldLength;
}
else if (packetType == kGeneralMassiveMessage)
{
auto oldLength = view.length;
auto oldWriteHead = view.writePtr;
view.length = (oldRead - view.base) + frameLength;
view.writePtr = view.base + view.length;
auto str = view.Read<AuString>();
if (view)
{
auto pMemory = AuIPC::ImportSharedMemory(str);
if (!pMemory)
{
SysPushErrorIO("Couldnt open shared memory for large packet");
this->FatalError();
return true;
}
AuByteBuffer buf(pMemory->GetMemory());
if (!buf)
{
SysPushErrorMemory();
this->FatalError();
return true;
}
if (auto pCallbacks = this->server_->pCallbacks)
{
pCallbacks->OnMessage(this->SharedFromThis(),
buf);
}
}
view.readPtr = oldRead + frameLength;
view.writePtr = oldWriteHead;
view.length = oldLength;
}
}
while (true);
@ -177,13 +214,58 @@ AuUInt64 AuRPCServerChannel::GetConnectTimeNS()
return this->uConnectTime_;
}
void AuRPCServerChannel::SendMessage(const AuMemoryViewRead &view)
bool AuRPCServerChannel::SendMessage(const AuMemoryViewRead &view, bool bLarge)
{
auto pMessage = AuMakeSharedPanic<AuRPCResponseOwned>();
pMessage->PrepareResponse(kGeneralClientMessage);
pMessage->buffer->Write(view.ptr, view.length);
pMessage->FinalizeWrite();
this->SendResponse(pMessage);
if (!bLarge &&
view.length < this->parent->GetLargePacketLength())
{
auto pMessage = AuMakeShared<AuRPCResponseOwned>();
if (!pMessage)
{
return false;
}
pMessage->PrepareResponse(kGeneralClientMessage);
pMessage->buffer->Write(view.ptr, view.length);
pMessage->FinalizeWrite();
if (pMessage->message->flagWriteError)
{
return false;
}
this->SendResponse(pMessage);
return true;
}
else
{
auto pSharedMemory = AuIPC::NewSharedMemory(view.length);
if (!pSharedMemory)
{
return false;
}
auto dest = pSharedMemory->GetMemory();
AuMemcpy(dest.ptr, view.ptr, view.length);
auto pMessage = AuMakeShared<AuRPCResponseOwned>();
if (!pMessage)
{
return false;
}
pMessage->PrepareResponse(kGeneralMassiveMessage);
pMessage->buffer->Write(AuString(pSharedMemory->ExportToString()));
pMessage->FinalizeWrite();
if (pMessage->message->flagWriteError)
{
return false;
}
this->SendResponse(pMessage);
return true;
}
}
void AuRPCServerChannel::SendConnectOK()

View File

@ -29,7 +29,7 @@ struct AuRPCServerChannel :
virtual bool OnDataAvailable(AuByteBuffer& view) override;
virtual AuUInt64 GetConnectTimeNS() override;
virtual void SendMessage(const AuMemoryViewRead &view) override;
virtual bool SendMessage(const AuMemoryViewRead &view, bool bLarge) override;
void FatalError();