/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuNetSocketChannelOutput.cpp Date: 2022-8-21 Author: Reece ***/ #include "Networking.hpp" #include "AuNetSocket.hpp" #include "AuNetSocketChannelOutput.hpp" #include "AuNetWorker.hpp" #include #if defined(AURORA_IS_MODERNNT_DERIVED) #include "AuNetStream.NT.hpp" #else #include "AuNetStream.Linux.hpp" #endif static const auto kDefaultBufferSize = 64 * 1024; namespace Aurora::IO::Net { SocketChannelOutput::SocketChannelOutput(SocketBase *pParent, const AuSPtr &stream) : pParent_(pParent), pNetWriteTransaction_(stream), outputBuffer_(kDefaultBufferSize, true) { this->outputWriteQueue_.pBase = pParent; this->outputBuffer_.flagNoRealloc = true; } bool SocketChannelOutput::IsValid() { return bool(this->pNetWriteTransaction_) && bool(this->outputBuffer_); } AuSPtr SocketChannelOutput::ToWriteTransaction() { return AuSPtr(this->pNetWriteTransaction_); } AuSPtr SocketChannelOutput::AsWritableByteBuffer() { if (!this->pParent_) { return {}; } return AuSPtr(this->pParent_->SharedFromThis(), &this->outputBuffer_); } void SocketChannelOutput::ScheduleOutOfFrameWrite() { SendIfData(); SchedWriteTick(); } void SocketChannelOutput::SchedWriteTick() { if (!this->pParent_) { return; } auto pWorker = this->pParent_->ToWorkerEx(); //if (this->pParent_->bHasFinalized_) { if (pWorker->IsOnThread()) { WriteTick(); } return; } auto meShared = AuSPtr(this->pParent_->SharedFromThis(), this); if (!pWorker->TryScheduleInternalTemplate([=](const AuSPtr> &info) { meShared->WriteTick(); info->OnSuccess((void *)nullptr); }, AuStaticCast>(AuMakeShared>([=](const AuSPtr &dumb) { })))) { SysPushErrorIO("Couldn't schedule write tick"); this->pParent_->SendErrorBeginShutdown(AuNet::ENetworkError::eAsyncError); } } void SocketChannelOutput::SendIfData() { AU_LOCK_GUARD(this->lock_); if (!this->pParent_) { return; } while (this->pOutSendPointer_ != this->outputBuffer_.writePtr) { if (this->pOutSendPointer_ == this->outputBuffer_.writePtr) { break; } // TODO: replace legacy code with circular buffer safe utils? if (this->pOutSendPointer_ == this->outputBuffer_.base + this->outputBuffer_.length) { this->pOutSendPointer_ = this->outputBuffer_.base; continue; } if (!this->pOutSendPointer_) { this->pOutSendPointer_ = this->outputBuffer_.base; continue; } if ((AuUInt8 *)this->pOutSendPointer_ > this->outputBuffer_.writePtr) { auto pBase = (AuUInt8 *)this->pOutSendPointer_; auto uLen = (AuUInt8 *)this->outputBuffer_.base + this->outputBuffer_.length - pBase; // do not lock! auto mem = AuMemoryViewRead { AuMemoryViewRead { pBase, (AuUInt)uLen }, this->pParent_->SharedFromThis() }; this->pOutSendPointer_ = (AuUInt8 *)this->pOutSendPointer_ + uLen; this->outputWriteQueue_.Push(mem); AuAtomicAdd(&this->outputBuffer_.uInUseCounter, 1u); } else { auto pBase = (AuUInt8 *)this->pOutSendPointer_; auto pWriteHead = (AuUInt8 *)this->outputBuffer_.writePtr; auto uLen = pWriteHead - pBase; // do not lock! auto mem = AuMemoryViewRead { AuMemoryViewRead { pBase, (AuUInt)uLen }, this->pParent_->SharedFromThis() }; this->pOutSendPointer_ = pWriteHead; this->outputWriteQueue_.Push(mem); AuAtomicAdd(&this->outputBuffer_.uInUseCounter, 1u); } } } void SocketChannelOutput::WriteTick() { bool bShouldShutdown {}; { AU_LOCK_GUARD(this->lock_); bShouldShutdown = WriteTickLocked(); } if (bShouldShutdown) { if (this->pParent_) { this->pParent_->Shutdown(true); } } } bool SocketChannelOutput::WriteTickLocked() { if (!this->pParent_) { return false; } #if defined(AURORA_IS_MODERNNT_DERIVED) auto pHackTransaction = AuStaticCast(this->pNetWriteTransaction_); #else auto pHackTransaction = AuStaticCast(this->pNetWriteTransaction_); #endif if (pHackTransaction->bIsWriting) { // IsComplete? if (!pHackTransaction->bLatch) { return false; } } if (this->CanResize() && this->pParent_) { this->pParent_->socketChannel_.DoReallocWriteTick(); } // do not forcefully flush preemptive hello packets until the socket has properly connected if (!this->pParent_->bHasConnected_) { return false; } if (auto pFrameToSend = this->outputWriteQueue_.Dequeue()) { if (this->pParent_) { this->pNetWriteTransaction_->SetCallback(AuSPtr(this->pParent_->SharedFromThis(), this)); if (!this->pParent_->ToWorkerEx()->IncrementIOEventTaskCounter()) { SysPushErrorIO("Couldn't begin wait"); this->pParent_->SendErrorBeginShutdown({}); return false; } if (!this->pNetWriteTransaction_->StartWrite(0, pFrameToSend)) { this->pParent_->ToWorkerEx()->DecrementIOEventTaskCounter(); SysPushErrorIO("Couldn't dispatch the to-send frame, had: {} bytes remaining to send", pFrameToSend.length); this->pParent_->SendErrorBeginShutdown({}); return false; } AuAtomicAdd(&this->uCompleteCounter_, 1u); } } else { this->pNetWriteTransaction_->SetCallback({}); if (this->outputBuffer_.HasStreamError()) { this->pParent_->SendErrorBeginShutdown(AuNet::ENetworkError::eBrokenByteBuffer); return true; } if (this->bShutdownOnComplete) { return true; } } return false; } void SocketChannelOutput::OnEndOfReadTick() { SendIfData(); WriteTick(); } AuByteBuffer &SocketChannelOutput::GetByteBuffer() { return this->outputBuffer_; } bool SocketChannelOutput::CanResize() { return this->outputWriteQueue_.IsEmpty() && AuAtomicLoad(&this->uCompleteCounter_) == 0; } bool SocketChannelOutput::CanShutdown() { return this->CanResize() && !this->AsWritableByteBuffer()->RemainingBytes(); } void SocketChannelOutput::OnAsyncFileOpFinished(AuUInt64 offset, AuUInt32 length) { AuAtomicSub(&this->outputBuffer_.uInUseCounter, 1u); AuAtomicSub(&this->uCompleteCounter_, 1u); if (!this->pParent_) { return; } this->pParent_->ToWorkerEx()->DecrementIOEventTaskCounter(); AuStaticCast(this->pParent_->ToChannel())->GetSendStatsEx().AddBytes(length); if (auto pServerSendStats = AuStaticCast(this->pParent_->ToChannel())->GetSendStatsEx2()) { pServerSendStats->AddBytes(length); } if (!length) { this->pParent_->SendErrorBeginShutdown({}); return; } this->outputWriteQueue_.NotifyBytesWritten(length); // does the resend bit SysAssert(this->outputBuffer_.ReaderTryGoForward(length)); WriteTick(); } }