/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuNetSocketChannelInput.cpp Date: 2022-8-21 Author: Reece ***/ #include "Networking.hpp" #include "AuNetSocket.hpp" #include "AuNetSocketChannelInput.hpp" #include "AuNetWorker.hpp" #include "AuNetSocketChannel.hpp" #include namespace Aurora::IO::Net { SocketChannelInput::SocketChannelInput(SocketBase *pParent, const AuSPtr &asyncStream) : pParent_(pParent), pNetReadTransaction(asyncStream) { } void SocketChannelInput::OnReadTick() { if (!this->pNetReader) { return; } struct DispatchLater : IIOProcessorWorkUnit { inline DispatchLater(const AuSPtr &parent) : parent_(parent) { } inline virtual void OnRun() override { auto pReader = this->parent_->pNetReader; if (!pReader) { return; } AuStaticCast(pReader)->Tick_Any(); AuStaticCast(pReader)->Tick_FrameEpilogue(); } inline virtual void OnCanceled() override { } AuSPtr parent_; }; auto temp = AuMakeShared(AuSPtr(this->pParent_->SharedFromThis(), this)); if (!(temp && this->pParent_->ToWorker()->ToProcessor()->SubmitIOWorkItem(temp))) { AuStaticCast(this->pNetReader)->Tick_Any(); AuStaticCast(this->pNetReader)->Tick_FrameEpilogue(); } } AuSPtr SocketChannelInput::AsReadableByteBuffer() { if (!this->pNetReader) { return {}; } return AuSPtr(this->pParent_->SharedFromThis(), AuStaticCast(this->pNetReader)->GetBuffer()); } void SocketChannelInput::WarmOnEstablish() { auto sharedThis = AuSPtr(this->pParent_->SharedFromThis(), this); AuIO::IOPipeRequestAIO req; req.output.handleBufferedStream.pOnData = AuUnsafeRaiiToShared(this); req.output.type = EPipeCallbackType::eTryHandleBufferedPart; req.pAsyncTransaction = this->pNetReadTransaction; req.bIsStream = true; req.pListener = sharedThis; req.uBufferLengthOrZero = AuStaticCast(this->pParent_->ToChannel())->uBytesInputBuffer; this->pNetReader = this->pParent_->ToWorker()->ToProcessor()->ToPipeProcessor()->NewAIOPipe(req); } void SocketChannelInput::OnEstablish() { auto sharedThis = AuSPtr(this->pParent_->SharedFromThis(), this); if (!this->pNetReader) { return; } if (!this->pNetReader->Start()) { this->pParent_->SendErrorNoStream({}); return; } this->pNetReadTransaction->SetCallback(sharedThis);//; AuSPtr(this->pParent_->SharedFromThis(), this)); IncrementWorker(); } bool SocketChannelInput::OnDataAvailable(Memory::ByteBuffer &view) { this->pParent_->SendOnData(); return true; // do not fuck with the read pointer - client was notified and did only what it wanted to do } void SocketChannelInput::OnPipePartialEvent(AuUInt transferred) { // Other method is used } void SocketChannelInput::OnPipeSuccessEvent() { this->pParent_->SendEnd(); DecrementWorker(); this->pNetReader.reset(); this->pNetReadTransaction.reset(); } void SocketChannelInput::OnPipeFailureEvent() { DecrementWorker(); this->pNetReader.reset(); this->pNetReadTransaction.reset(); } bool SocketChannelInput::IsValid() { return bool(this->pNetReadTransaction); } void SocketChannelInput::IncrementWorker() { if (!this->pParent_->ToWorkerEx()->IncrementIOEventTaskCounter()) { this->pParent_->SendErrorBeginShutdown({}); } } void SocketChannelInput::DecrementWorker() { if (this->pParent_) { this->pParent_->ToWorkerEx()->DecrementIOEventTaskCounter(); } if (this->pNetReadTransaction) { this->pNetReadTransaction->SetCallback({}); } } void SocketChannelInput::OnAsyncFileOpFinished(AuUInt64 offset, AuUInt32 length) { OnReadTick(); } }