/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuNetSocketChannelInput.cpp Date: 2022-8-21 Author: Reece ***/ #include "Networking.hpp" #include "AuNetSocket.hpp" #include "AuNetSocketChannelInput.hpp" #include "AuNetWorker.hpp" #include "AuNetSocketChannel.hpp" #include #include "AuNetError.hpp" namespace Aurora::IO::Net { SocketChannelInput::SocketChannelInput(SocketBase *pParent, const AuSPtr &asyncStream) : pParent_(pParent), pNetReadTransaction(asyncStream) { } void SocketChannelInput::OnReadTick() { if (!this->pNetReader) { return; } AuStaticCast(this->pParent_->ToChannel())->DoReallocReadTick(); struct DispatchLater : IIOProcessorWorkUnit { inline DispatchLater(const AuSPtr &parent) : parent_(parent) { } inline virtual void OnRun() override { auto pReader = this->parent_->pNetReader; if (!pReader) { return; } AuStaticCast(pReader)->Tick_Any(); AuStaticCast(pReader)->Tick_FrameEpilogue(); } inline virtual void OnCanceled() override { } AuSPtr parent_; }; auto temp = AuMakeShared(AuSPtr(this->pParent_->SharedFromThis(), this)); if (!(temp && this->pParent_->ToWorker()->ToProcessor()->SubmitIOWorkItem(temp))) { SysPushErrorIO("Preemptively running tick. low resource?"); AuStaticCast(this->pNetReader)->Tick_Any(); AuStaticCast(this->pNetReader)->Tick_FrameEpilogue(); } } AuSPtr SocketChannelInput::AsReadableByteBuffer() { if (!this->pNetReader) { return {}; } return AuSPtr(this->pParent_->SharedFromThis(), AuStaticCast(this->pNetReader)->GetBuffer()); } void SocketChannelInput::WarmOnEstablish() { auto sharedThis = AuSPtr(this->pParent_->SharedFromThis(), this); AuIO::IOPipeRequestAIO req; req.output.handleBufferedStream.pOnData = AuUnsafeRaiiToShared(this); req.output.type = EPipeCallbackType::eTryHandleBufferedPart; req.pAsyncTransaction = this->pNetReadTransaction; req.bIsStream = true; req.pListener = sharedThis; req.uBufferLengthOrZero = AuStaticCast(this->pParent_->ToChannel())->uBytesInputBuffer; this->pNetReader = this->pParent_->ToWorker()->ToProcessor()->ToPipeProcessor()->NewAIOPipe(req); } void SocketChannelInput::OnEstablish() { auto sharedThis = AuSPtr(this->pParent_->SharedFromThis(), this); auto pReader = this->pNetReader; if (!pReader) { return; } AuStaticCast(pReader)->bShouldBypassIOWatch = true; auto pTX = this->pNetReadTransaction; if (!pTX) { this->pParent_->SendErrorNoStream({}); return; } pTX->SetCallback(sharedThis); if (!pReader->Start()) { this->pParent_->SendErrorNoStream({}); return; } IncrementWorker(); } bool SocketChannelInput::OnDataAvailable(Memory::ByteBuffer &view) { this->pParent_->SendOnData(); return true; // do not fuck with the read pointer - client was notified and did only what it wanted to do } void SocketChannelInput::OnPipePartialEvent(AuUInt transferred) { // Other method is used if (auto pBuffer = this->AsReadableByteBuffer()) { if (pBuffer->HasStreamError()) { this->pParent_->SendErrorBeginShutdown(AuNet::ENetworkError::eBrokenByteBuffer); } } } void SocketChannelInput::OnPipeReallocEvent(bool bSuccess) { } void SocketChannelInput::OnPipeSuccessEvent() { DecrementWorker(); // precache error or shutdown code before the last read transaction is lost (void)this->pParent_->GetError(); this->pParent_->SendEnd(); this->pNetReader.reset(); this->pNetReadTransaction.reset(); } void SocketChannelInput::OnPipeFailureEvent() { NetError error; bool bSetError {}; DecrementWorker(); if (this->pNetReadTransaction) { if (this->pNetReadTransaction->HasFailed()) { NetError_SetOsError(error, this->pNetReadTransaction->GetOSErrorCode()); bSetError = true; } else { // precache error or shutdown code before the last read transaction is lost (void)this->pParent_->GetError(); } } else { // precache error or shutdown code before the last read transaction is lost (void)this->pParent_->GetError(); } if (!bSetError) { error = ENetworkError::eBrokenAIOPipe; } this->pParent_->SendErrorNoStream(error); this->pNetReader.reset(); this->pNetReadTransaction.reset(); } bool SocketChannelInput::IsValid() { return bool(this->pNetReadTransaction); } void SocketChannelInput::IncrementWorker() { if (!this->pParent_->ToWorkerEx()->IncrementIOEventTaskCounter()) { this->pParent_->SendErrorBeginShutdown({}); } } void SocketChannelInput::DecrementWorker() { if (this->pParent_) { this->pParent_->ToWorkerEx()->DecrementIOEventTaskCounter(); } if (this->pNetReadTransaction) { this->pNetReadTransaction->SetCallback({}); } } void SocketChannelInput::OnAsyncFileOpFinished(AuUInt64 offset, AuUInt32 length) { OnReadTick(); } }