/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuNetSocketChannelOutput.cpp Date: 2022-8-21 Author: Reece ***/ #include "Networking.hpp" #include "AuNetSocket.hpp" #include "AuNetSocketChannelOutput.hpp" #include "AuNetWorker.hpp" #include #if defined(AURORA_IS_MODERNNT_DERIVED) #include "AuNetStream.NT.hpp" #else #include "AuNetStream.Linux.hpp" #endif static const auto kDefaultBufferSize = 64 * 1024; namespace Aurora::IO::Net { SocketChannelOutput::SocketChannelOutput(SocketBase *pParent, const AuSPtr &stream) : pParent_(pParent), pNetWriteTransaction_(stream), outputBuffer_(kDefaultBufferSize, true) { this->outputWriteQueue_.pBase = pParent; } bool SocketChannelOutput::IsValid() { return bool(this->pNetWriteTransaction_) && bool(this->outputBuffer_); } AuSPtr SocketChannelOutput::ToWriteTransaction() { return AuSPtr(this->pNetWriteTransaction_); } AuSPtr SocketChannelOutput::AsWritableByteBuffer() { if (!this->pParent_) { return {}; } return AuSPtr(this->pParent_->SharedFromThis(), &this->outputBuffer_); } void SocketChannelOutput::ScheduleOutOfFrameWrite() { SendIfData(); SchedWriteTick(); } void SocketChannelOutput::SchedWriteTick() { if (!this->pParent_) { return; } auto pWorker = this->pParent_->ToWorkerEx(); if (this->pParent_->bHasFinalized_) { if (pWorker->IsOnThread()) { WriteTick(); } return; } auto meShared = AuSPtr(this->pParent_->SharedFromThis(), this); if (!pWorker->TryScheduleInternalTemplate([=](const AuSPtr> &info) { meShared->WriteTick(); info->OnSuccess((void *)nullptr); }, AuStaticCast>(AuMakeShared>([=](const AuSPtr &dumb) { })))) { SysPushErrorIO("Couldn't schedule write tick"); this->pParent_->SendErrorBeginShutdown({}); } } void SocketChannelOutput::SendIfData() { AU_LOCK_GUARD(this->lock_); if (!this->pParent_) { return; } struct View : AuMemoryViewRead { View(const AuMemoryViewRead &in) : AuMemoryViewRead(in) { } AuSPtr pin; }; while (this->pOutSendPointer_ != this->outputBuffer_.writePtr) { if (this->pOutSendPointer_ == this->outputBuffer_.writePtr) { break; } if (this->pOutSendPointer_ == this->outputBuffer_.base + this->outputBuffer_.length) { this->pOutSendPointer_ = this->outputBuffer_.base; continue; } if (!this->pOutSendPointer_) { this->pOutSendPointer_ = this->outputBuffer_.base; continue; } if ((AuUInt8 *)this->pOutSendPointer_ > this->outputBuffer_.writePtr) { auto pBase = (AuUInt8 *)this->pOutSendPointer_; auto uLen = (AuUInt8 *)this->outputBuffer_.base + this->outputBuffer_.length - pBase; auto pView = AuMakeShared(AuMemoryViewRead(pBase, uLen)); SysAssert(pView); pView->pin = this->pParent_->SharedFromThis(); this->outputWriteQueue_.Push(pView); this->pOutSendPointer_ = (AuUInt8 *)this->pOutSendPointer_ + uLen; } else { auto pBase = (AuUInt8 *)this->pOutSendPointer_; auto pWriteHead = (AuUInt8 *)this->outputBuffer_.writePtr; auto uLen = pWriteHead - pBase; auto pView = AuMakeShared(AuMemoryViewRead(pBase, uLen)); SysAssert(pView); pView->pin = this->pParent_->SharedFromThis(); this->outputWriteQueue_.Push(pView); this->pOutSendPointer_ = pWriteHead ; } } } void SocketChannelOutput::WriteTick() { bool bShouldShutdown {}; { AU_LOCK_GUARD(this->lock_); bShutdownOnComplete = WriteTickLocked(); } if (bShouldShutdown) { if (this->pParent_) { this->pParent_->Shutdown(true); } } } bool SocketChannelOutput::WriteTickLocked() { if (!this->pParent_) { return false; } #if defined(AURORA_IS_MODERNNT_DERIVED) auto pHackTransaction = AuStaticCast(this->pNetWriteTransaction_); #else auto pHackTransaction = AuStaticCast(this->pNetWriteTransaction_); #endif if (pHackTransaction->bIsWriting) { // IsComplete? if (!pHackTransaction->bLatch) { return false; } } if (this->outputWriteQueue_.IsEmpty()) { if (this->pParent_)// && ///this->outputBuffer_.outputChannel.AsWritableByteBuffer()->GetNextLinearRead().length == 0) { this->pParent_->socketChannel_.DoReallocWriteTick(); } } // do not forcefully flush preemptive hello packets until the socket has properly connected if (!this->pParent_->bHasConnected_) { return true; } if (auto pFrameToSend = this->outputWriteQueue_.Dequeue()) { if (this->pParent_) { this->pNetWriteTransaction_->SetCallback(AuSPtr(this->pParent_->SharedFromThis(), this)); if (!this->pParent_->ToWorkerEx()->IncrementIOEventTaskCounter()) { SysPushErrorIO("Couldn't begin wait"); this->pParent_->SendErrorBeginShutdown({}); return false; } if (!this->pNetWriteTransaction_->StartWrite(0, pFrameToSend)) { this->pParent_->ToWorkerEx()->DecrementIOEventTaskCounter(); SysPushErrorIO("Couldn't dispatch the to-send frame"); this->pParent_->SendErrorBeginShutdown({}); return false; } } } else { this->pNetWriteTransaction_->SetCallback({}); if (this->bShutdownOnComplete) { return true; } } return false; } void SocketChannelOutput::OnEndOfReadTick() { SendIfData(); WriteTick(); } AuByteBuffer &SocketChannelOutput::GetByteBuffer() { return this->outputBuffer_; } bool SocketChannelOutput::CanResize() { return this->outputWriteQueue_.IsEmpty(); } void SocketChannelOutput::OnAsyncFileOpFinished(AuUInt64 offset, AuUInt32 length) { if (!this->pParent_) { return; } this->pParent_->ToWorkerEx()->DecrementIOEventTaskCounter(); AuStaticCast(this->pParent_->ToChannel())->GetSendStatsEx().AddBytes(length); if (!length) { this->pParent_->SendErrorBeginShutdown({}); return; } this->outputWriteQueue_.NotifyBytesWritten(length); // does the resend bit SysAssert(this->outputBuffer_.ReaderTryGoForward(length)); WriteTick(); } }