AuroraRuntime/Source/IO/Net/AuNetSocketChannelOutput.cpp
Reece Wilson f43251c8fc [+] AuNet::ISocketChannelEventListener
[+] AuFS::UpdateTimes
[+] AuFS::UpdateFileTimes
[+] AuFS::CompressEx
[*] AuFS::Compress now rejects files that look to be already compressed
[+] AuFS::DecompressEx
[+] AuFS::Create
[+] AuFS::WriteNewFile
[+] AuFS::WriteNewString
[+] AuFs::FileAttrsList
[+] AuFs::FileAttrsGet
[+] AuFs::FileAttrsSet
[+] DirectoryLogger::uMaxLogsOrZeroBeforeCompress
[+] ISocketChannel.AddEventListener
[+] ISocketChannel.AddEventListener
[+] DirectoryLogger.uMaxLogsOrZeroBeforeCompress
[*] Fix UNIX regression
[*] Fix up stream socket channel realloc IPC
[*] Fix shutdown regression in pretty much everything thanks to 8ff81df1's dumbass fix
    (fixes fence regression on shutdown)
[*] Fix DirDeleterEx formatting of reported failed paths
[*] Fix up file not truncated if already exists bugs. Extended and alternative apis added.
[*] Fix ICompressionStream::ReadEx returning the wrong read value
[+] Legacy compression API can now self-correct once newer stream processor objects are added
2023-02-04 19:43:01 +00:00

251 lines
7.3 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetSocketChannelOutput.cpp
Date: 2022-8-21
Author: Reece
***/
#include "Networking.hpp"
#include "AuNetSocket.hpp"
#include "AuNetSocketChannelOutput.hpp"
#include "AuNetWorker.hpp"
#include <Source/IO/AuIOPipeProcessor.hpp>
#if defined(AURORA_IS_MODERNNT_DERIVED)
#include "AuNetStream.NT.hpp"
#else
#include "AuNetStream.Linux.hpp"
#endif
static const auto kDefaultBufferSize = 64 * 1024;
namespace Aurora::IO::Net
{
SocketChannelOutput::SocketChannelOutput(SocketBase *pParent, const AuSPtr<IAsyncTransaction> &stream) :
pParent_(pParent),
pNetWriteTransaction_(stream),
outputBuffer_(kDefaultBufferSize, true)
{
this->outputWriteQueue_.pBase = pParent;
}
bool SocketChannelOutput::IsValid()
{
return bool(this->pNetWriteTransaction_) &&
bool(this->outputBuffer_);
}
AuSPtr<IAsyncTransaction> SocketChannelOutput::ToWriteTransaction()
{
return AuSPtr<IAsyncTransaction>(this->pNetWriteTransaction_);
}
AuSPtr<Memory::ByteBuffer> SocketChannelOutput::AsWritableByteBuffer()
{
return AuSPtr<Memory::ByteBuffer>(this->pParent_->SharedFromThis(),
&this->outputBuffer_);
}
void SocketChannelOutput::ScheduleOutOfFrameWrite()
{
SendIfData();
SchedWriteTick();
}
void SocketChannelOutput::SchedWriteTick()
{
auto pWorker = this->pParent_->ToWorkerEx();
if (this->pParent_->bHasFinalized_)
{
if (pWorker->IsOnThread())
{
WriteTick();
}
return;
}
auto meShared = AuSPtr<SocketChannelOutput>(this->pParent_->SharedFromThis(), this);
if (!pWorker->TryScheduleInternalTemplate<AuNullS>([=](const AuSPtr<AuAsync::PromiseCallback<AuNullS>> &info)
{
meShared->WriteTick();
info->OnSuccess((void *)nullptr);
}, AuStaticCast<AuAsync::PromiseCallback<AuNullS, AuNullS>>(AuMakeShared<AuAsync::PromiseCallbackFunctional<AuNullS, AuNullS>>([=](const AuSPtr<AuNullS> &dumb)
{
}))))
{
SysPushErrorIO("Couldn't schedule write tick");
this->pParent_->SendErrorBeginShutdown({});
}
}
void SocketChannelOutput::SendIfData()
{
AU_LOCK_GUARD(this->lock_);
struct View : AuMemoryViewRead
{
View(const AuMemoryViewRead &in) : AuMemoryViewRead(in)
{
}
AuSPtr<void> pin;
};
while (this->pOutSendPointer_ != this->outputBuffer_.writePtr)
{
if (this->pOutSendPointer_ == this->outputBuffer_.writePtr)
{
break;
}
if (this->pOutSendPointer_ == this->outputBuffer_.base + this->outputBuffer_.length)
{
this->pOutSendPointer_ = this->outputBuffer_.base;
continue;
}
if (!this->pOutSendPointer_)
{
this->pOutSendPointer_ = this->outputBuffer_.base;
continue;
}
if ((AuUInt8 *)this->pOutSendPointer_ > this->outputBuffer_.writePtr)
{
auto pBase = (AuUInt8 *)this->pOutSendPointer_;
auto uLen = (AuUInt8 *)this->outputBuffer_.base + this->outputBuffer_.length - pBase;
auto pView = AuMakeShared<View>(AuMemoryViewRead(pBase, uLen));
SysAssert(pView);
pView->pin = this->pParent_->SharedFromThis();
this->outputWriteQueue_.Push(pView);
this->pOutSendPointer_ = (AuUInt8 *)this->pOutSendPointer_ + uLen;
}
else
{
auto pBase = (AuUInt8 *)this->pOutSendPointer_;
auto pWriteHead = (AuUInt8 *)this->outputBuffer_.writePtr;
auto uLen = pWriteHead - pBase;
auto pView = AuMakeShared<View>(AuMemoryViewRead(pBase, uLen));
SysAssert(pView);
pView->pin = this->pParent_->SharedFromThis();
this->outputWriteQueue_.Push(pView);
this->pOutSendPointer_ = pWriteHead ;
}
}
}
void SocketChannelOutput::WriteTick()
{
bool bShouldShutdown {};
{
AU_LOCK_GUARD(this->lock_);
bShutdownOnComplete = WriteTickLocked();
}
if (bShouldShutdown)
{
this->pParent_->Shutdown(true);
}
}
bool SocketChannelOutput::WriteTickLocked()
{
#if defined(AURORA_IS_MODERNNT_DERIVED)
auto pHackTransaction =
AuStaticCast<NtAsyncNetworkTransaction>(this->pNetWriteTransaction_);
#else
auto pHackTransaction =
AuStaticCast<LinuxAsyncNetworkTransaction>(this->pNetWriteTransaction_);
#endif
if (pHackTransaction->bIsWriting)
{
// IsComplete?
if (!pHackTransaction->bLatch)
{
return false;
}
}
if (this->outputWriteQueue_.IsEmpty())
{
if (this->pParent_)// &&
///this->outputBuffer_.outputChannel.AsWritableByteBuffer()->GetNextLinearRead().length == 0)
{
this->pParent_->socketChannel_.DoReallocWriteTick();
}
}
if (auto pFrameToSend = this->outputWriteQueue_.Dequeue())
{
this->pNetWriteTransaction_->SetCallback(AuSPtr<IAsyncFinishedSubscriber>(this->pParent_->SharedFromThis(), this));
if (!this->pParent_->ToWorkerEx()->IncrementIOEventTaskCounter())
{
SysPushErrorIO("Couldn't begin wait");
this->pParent_->SendErrorBeginShutdown({});
return false;
}
if (!this->pNetWriteTransaction_->StartWrite(0, pFrameToSend))
{
this->pParent_->ToWorkerEx()->DecrementIOEventTaskCounter();
SysPushErrorIO("Couldn't dispatch the to-send frame");
this->pParent_->SendErrorBeginShutdown({});
return false;
}
}
else
{
this->pNetWriteTransaction_->SetCallback({});
if (this->bShutdownOnComplete)
{
return true;
}
}
return false;
}
void SocketChannelOutput::OnEndOfReadTick()
{
SendIfData();
WriteTick();
}
AuByteBuffer &SocketChannelOutput::GetByteBuffer()
{
return this->outputBuffer_;
}
bool SocketChannelOutput::CanResize()
{
return this->outputWriteQueue_.IsEmpty();
}
void SocketChannelOutput::OnAsyncFileOpFinished(AuUInt64 offset, AuUInt32 length)
{
this->pParent_->ToWorkerEx()->DecrementIOEventTaskCounter();
AuStaticCast<SocketChannel>(this->pParent_->ToChannel())->GetSendStatsEx().AddBytes(length);
if (!length)
{
this->pParent_->SendErrorBeginShutdown({});
return;
}
this->outputWriteQueue_.NotifyBytesWritten(length); // does the resend bit
SysAssert(this->outputBuffer_.ReaderTryGoForward(length));
WriteTick();
}
}