AuroraRuntime/Source/IO/Net/AuNetStream.Linux.cpp

426 lines
11 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetStream.Linux.cpp
Date: 2022-8-26
Author: Reece
***/
#include "Networking.hpp"
#include "AuNetStream.Linux.hpp"
#include "AuNetSocket.hpp"
#include "AuNetWorker.hpp"
#include "AuNetEndpoint.hpp"
#include <Source/IO/Loop/LSEvent.hpp>
namespace Aurora::IO::Net
{
LinuxAsyncNetworkTransaction::LinuxAsyncNetworkTransaction(SocketBase *pSocket) :
pSocket(pSocket)
{
if (this->pSocket->GetRemoteEndpoint().transportProtocol == ETransportProtocol::eProtocolTCP)
{
//this->dwRecvFlags = MSG_PARTIAL;
}
}
LinuxAsyncNetworkTransaction::~LinuxAsyncNetworkTransaction()
{
Reset();
}
bool LinuxAsyncNetworkTransaction::StartRead(AuUInt64 offset, const AuSPtr<AuMemoryViewWrite> &memoryView)
{
if (this->bDisallowRecv)
{
SysPushErrorIO("Recv isn't allowed");
return false;
}
if (this->bIsIrredeemable)
{
SysPushErrorIO("Transaction was signaled to be destroyed to reset mid synchronizable operation. You can no longer use this stream object");
return false;
}
if (!IDontWannaUsePorts())
{
return false;
}
if (!memoryView)
{
SysPushErrorArg();
return {};
}
//if (this->pMemoryHold)
//{
// SysPushErrorIO("IO Operation in progress");
// return {};
//}
this->bLatch = false;
//this->pMemoryHold = memoryView;
this->bHasFailed = false;
this->dwLastAbstractStat = memoryView->length;
this->dwLastAbstractOffset = offset;
this->dwLastBytes = 0;
this->bIsWriting = false;
LIOS_Init(AuSharedFromThis());
SetMemory(memoryView);
if (!this->bDatagramMode)
{
AuLoop::ILSEvent *optEvent { this->GetAlertable() };
if (!UNIX::LinuxOverlappedSubmitRead(this->GetSocket(), 0, this, optEvent, true))
{
LIOS_SendProcess(0, true, errno);
return true;
}
else
{
if (gRuntimeConfig.linuxConfig.bFIODisableBatching)
{
UNIX::SendIOBuffers();
}
return true;
}
}
else
{
return false;
}
}
bool LinuxAsyncNetworkTransaction::StartWrite(AuUInt64 offset, const AuSPtr<AuMemoryViewRead> &memoryView)
{
if (this->bDisallowSend)
{
SysPushErrorIO("Send isn't allowed");
return false;
}
if (this->bIsIrredeemable)
{
SysPushErrorIO("Transaction was signaled to be destroyed to reset mid synchronizable operation. You can no longer use this stream object");
return false;
}
if (!IDontWannaUsePorts())
{
return false;
}
if (!memoryView)
{
SysPushErrorArg();
return {};
}
//if (this->pMemoryHold)
//{
// SysPushErrorIO("IO Operation in progress");
// return {};
//}
this->bLatch = false;
//this->pMemoryHold = memoryView;
this->bHasFailed = false;
this->dwLastAbstractStat = memoryView->length;
this->dwLastAbstractOffset = offset;
this->dwLastBytes = 0;
this->bIsWriting = true;
LIOS_Init(AuSharedFromThis());
SetMemory(memoryView);
if (!this->bDatagramMode)
{
AuLoop::ILSEvent *optEvent { this->GetAlertable() };
if (!UNIX::LinuxOverlappedSubmitWrite(this->GetSocket(), 0, this, optEvent))
{
LIOS_SendProcess(0, true, errno);
return false;
}
else
{
if (gRuntimeConfig.linuxConfig.bFIODisableBatching)
{
UNIX::SendIOBuffers();
}
return true;
}
}
else
{
this->iSocketLength = this->pSocket->endpointSize_;
#if 0
AuLogDbg("{} -> {} {}, {}, {} {}", this->GetSocket(),
memoryView->ptr,
memoryView->length,
0,
(void *)netEndpoint.hint,
this->iSocketLength);
#endif
if (::sendto(this->GetSocket(),
memoryView->ptr,
memoryView->length,
0,
(struct sockaddr *)netEndpoint.hint,
EndpointToLength(netEndpoint)) != memoryView->length)
{
LIOS_SendProcess(0, false, errno);
return true;
}
LIOS_SendProcess(memoryView->length, true, errno);
return true;
}
}
void LinuxAsyncNetworkTransaction::LIOS_Process(AuUInt32 read, bool failure, int err, bool mark)
{
this->dwLastBytes = failure ? 0 : read;
this->bHasFailed |= failure;
this->dwOsErrorCode = err;
if (mark)
{
return;
}
this->DispatchCb(read);
}
bool LinuxAsyncNetworkTransaction::TranslateLastError(bool bReturnValue)
{
return false;
}
void LinuxAsyncNetworkTransaction::SetBaseOffset(AuUInt64 uBaseOffset)
{
}
bool LinuxAsyncNetworkTransaction::TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup)
{
if (bool(this->pCompletionGroup_) ||
!pCompletionGroup)
{
return false;
}
auto pLoopSource = pCompletionGroup->GetTriggerLoopSource();
if (!pLoopSource)
{
return false;
}
pCompletionGroup->AddWorkItem(this->SharedFromThis());
this->pCompletionGroup_ = pCompletionGroup;
return true;
}
CompletionGroup::ICompletionGroupWorkHandle *LinuxAsyncNetworkTransaction::ToCompletionGroupHandle()
{
return this;
}
AuSPtr<CompletionGroup::ICompletionGroup> LinuxAsyncNetworkTransaction::GetCompletionGroup()
{
return this->pCompletionGroup_;
}
bool LinuxAsyncNetworkTransaction::Complete()
{
return this->bLatch;
}
bool LinuxAsyncNetworkTransaction::CompleteEx(AuUInt completeRoutine)
{
return false;
}
bool LinuxAsyncNetworkTransaction::HasFailed()
{
return this->bHasFailed;
}
bool LinuxAsyncNetworkTransaction::HasCompleted()
{
return this->bHasFailed ||
this->dwLastBytes ||
this->bLatch;
}
AuUInt LinuxAsyncNetworkTransaction::GetOSErrorCode()
{
return this->dwOsErrorCode;
}
AuUInt32 LinuxAsyncNetworkTransaction::GetLastPacketLength()
{
return this->dwLastBytes;
}
void LinuxAsyncNetworkTransaction::SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &sub)
{
this->pSub = sub;
}
bool LinuxAsyncNetworkTransaction::Wait(AuUInt32 timeout)
{
return this->bLatch;
}
bool LinuxAsyncNetworkTransaction::HasCompletedForGCWI()
{
return this->HasCompleted();
}
void LinuxAsyncNetworkTransaction::CleanupForGCWI()
{
AuResetMember(this->pCompletionGroup_);
}
AuSPtr<AuLoop::ILoopSource> LinuxAsyncNetworkTransaction::NewLoopSource()
{
AuLoop::ILSEvent *optEvent {};
if (this->pCompletionGroup_)
{
if (auto pLoopSource = this->pCompletionGroup_->GetTriggerLoopSource())
{
return pLoopSource;
}
}
{
if (auto pWaitable = this->pWaitable)
{
return pWaitable;
}
if (auto pSocket = this->pSocket)
{
return AuStaticCast<AuLoop::LSEvent>(pSocket->ToWorkerEx()->ToEvent());
}
//return this->pWaitable = AuLoop::NewLSEventSlow(false, true);
}
return {};
}
void LinuxAsyncNetworkTransaction::Reset()
{
if (this->dwLastAbstractStat)
{
this->bIsIrredeemable = true;
this->bHasFailed = true;
this->dwOsErrorCode = 0;
(void)this->LIOS_Cancel();
}
else
{
this->bHasFailed = false;
}
this->dwLastBytes = 0;
this->dwLastAbstractStat = 0;
}
void LinuxAsyncNetworkTransaction::MakeSyncable()
{
this->pWaitable = AuLoop::NewLSEventSlow(false, true);
SysAssert(this->pWaitable);
//this->overlap.hEvent = (HANDLE)AuStaticCast<Loop::LSEvent>(this->pWaitable)->GetHandle();
}
void LinuxAsyncNetworkTransaction::ForceNextWriteWait()
{
this->bForceNextWait = true;
}
bool LinuxAsyncNetworkTransaction::IDontWannaUsePorts()
{
return true;
}
void LinuxAsyncNetworkTransaction::DispatchCb(AuUInt32 read)
{
if (this->bIsWriting)
{
if (read != this->dwLastAbstractStat)
{
this->dwOsErrorCode = 69;
this->bHasFailed = true;
}
}
this->dwLastAbstractStat = 0;
this->dwLastBytes = read;
if (AuExchange(this->bLatch, true))
{
return;
}
if (this->pSub)
{
try
{
this->pSub->OnAsyncFileOpFinished(this->dwLastAbstractOffset, read);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
int LinuxAsyncNetworkTransaction::GetSocket()
{
return this->pSocket->ToPlatformHandle();
}
AuLoop::ILSEvent *LinuxAsyncNetworkTransaction::GetAlertable()
{
if (this->pCompletionGroup_)
{
auto pLoopSource = this->pCompletionGroup_->GetTriggerLoopSource();
if (pLoopSource)
{
return AuStaticCast<Loop::LSEvent>(pLoopSource).get();
}
}
else
{
if (auto pWaitable = this->pWaitable)
{
return AuStaticCast<AuLoop::LSEvent>(pWaitable).get();
}
if (auto pSocket = this->pSocket)
{
return AuStaticCast<AuLoop::LSEvent>(pSocket->ToWorkerEx()->ToEvent()).get();
}
}
return nullptr;
}
}