AuroraRuntime/Source/IO/Net/AuNetSocketChannelInput.cpp

223 lines
6.3 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetSocketChannelInput.cpp
Date: 2022-8-21
Author: Reece
***/
#include "Networking.hpp"
#include "AuNetSocket.hpp"
#include "AuNetSocketChannelInput.hpp"
#include "AuNetWorker.hpp"
#include "AuNetSocketChannel.hpp"
#include <Source/IO/AuIOPipeProcessor.hpp>
#include "AuNetError.hpp"
namespace Aurora::IO::Net
{
SocketChannelInput::SocketChannelInput(SocketBase *pParent, const AuSPtr<IAsyncTransaction> &asyncStream) :
pParent_(pParent),
pNetReadTransaction(asyncStream)
{
}
void SocketChannelInput::OnReadTick()
{
if (!this->pNetReader)
{
return;
}
AuStaticCast<SocketChannel>(this->pParent_->ToChannel())->DoReallocReadTick();
struct DispatchLater : IIOProcessorWorkUnit
{
inline DispatchLater(const AuSPtr<SocketChannelInput> &parent) :
parent_(parent)
{
}
inline virtual void OnRun() override
{
auto pReader = this->parent_->pNetReader;
if (!pReader)
{
return;
}
AuStaticCast<IOPipeWork>(pReader)->Tick_Any();
AuStaticCast<IOPipeWork>(pReader)->Tick_FrameEpilogue();
}
inline virtual void OnCanceled() override
{
}
AuSPtr<SocketChannelInput> parent_;
};
auto temp = AuMakeShared<DispatchLater>(AuSPtr<SocketChannelInput>(this->pParent_->SharedFromThis(), this));
if (!(temp && this->pParent_->ToWorker()->ToProcessor()->SubmitIOWorkItem(temp)))
{
SysPushErrorIO("Preemptively running tick. low resource?");
AuStaticCast<IOPipeWork>(this->pNetReader)->Tick_Any();
AuStaticCast<IOPipeWork>(this->pNetReader)->Tick_FrameEpilogue();
}
}
AuSPtr<Memory::ByteBuffer> SocketChannelInput::AsReadableByteBuffer()
{
if (!this->pNetReader)
{
return {};
}
return AuSPtr<Memory::ByteBuffer>(this->pParent_->SharedFromThis(),
AuStaticCast<IOPipeWork>(this->pNetReader)->GetBuffer());
}
void SocketChannelInput::WarmOnEstablish()
{
auto sharedThis = AuSPtr<SocketChannelInput>(this->pParent_->SharedFromThis(), this);
AuIO::IOPipeRequestAIO req;
req.output.handleBufferedStream.pOnData = AuUnsafeRaiiToShared(this);
req.output.type = EPipeCallbackType::eTryHandleBufferedPart;
req.pAsyncTransaction = this->pNetReadTransaction;
req.bIsStream = true;
req.pListener = sharedThis;
req.uBufferLengthOrZero = AuStaticCast<SocketChannel>(this->pParent_->ToChannel())->uBytesInputBuffer;
this->pNetReader = this->pParent_->ToWorker()->ToProcessor()->ToPipeProcessor()->NewAIOPipe(req);
}
void SocketChannelInput::OnEstablish()
{
auto sharedThis = AuSPtr<SocketChannelInput>(this->pParent_->SharedFromThis(), this);
auto pReader = this->pNetReader;
if (!pReader)
{
return;
}
if (!pReader->Start())
{
this->pParent_->SendErrorNoStream({});
return;
}
auto pTX = this->pNetReadTransaction;
if (!pTX)
{
this->pParent_->SendErrorNoStream({});
return;
}
pTX->SetCallback(sharedThis);//; AuSPtr<IAsyncFinishedSubscriber>(this->pParent_->SharedFromThis(), this));
IncrementWorker();
}
bool SocketChannelInput::OnDataAvailable(Memory::ByteBuffer &view)
{
this->pParent_->SendOnData();
return true; // do not fuck with the read pointer - client was notified and did only what it wanted to do
}
void SocketChannelInput::OnPipePartialEvent(AuUInt transferred)
{
// Other method is used
if (auto pBuffer = this->AsReadableByteBuffer())
{
if (pBuffer->HasStreamError())
{
this->pParent_->SendErrorBeginShutdown(AuNet::ENetworkError::eBrokenByteBuffer);
}
}
}
void SocketChannelInput::OnPipeReallocEvent(bool bSuccess)
{
}
void SocketChannelInput::OnPipeSuccessEvent()
{
DecrementWorker();
// precache error or shutdown code before the last read transaction is lost
(void)this->pParent_->GetError();
this->pParent_->SendEnd();
this->pNetReader.reset();
this->pNetReadTransaction.reset();
}
void SocketChannelInput::OnPipeFailureEvent()
{
NetError error;
bool bSetError {};
DecrementWorker();
if (this->pNetReadTransaction)
{
if (this->pNetReadTransaction->HasFailed())
{
NetError_SetOsError(error, this->pNetReadTransaction->GetOSErrorCode());
bSetError = true;
}
else
{
// precache error or shutdown code before the last read transaction is lost
(void)this->pParent_->GetError();
}
}
else
{
// precache error or shutdown code before the last read transaction is lost
(void)this->pParent_->GetError();
}
if (!bSetError)
{
error = ENetworkError::eBrokenAIOPipe;
}
this->pParent_->SendErrorNoStream(error);
this->pNetReader.reset();
this->pNetReadTransaction.reset();
}
bool SocketChannelInput::IsValid()
{
return bool(this->pNetReadTransaction);
}
void SocketChannelInput::IncrementWorker()
{
if (!this->pParent_->ToWorkerEx()->IncrementIOEventTaskCounter())
{
this->pParent_->SendErrorBeginShutdown({});
}
}
void SocketChannelInput::DecrementWorker()
{
if (this->pParent_)
{
this->pParent_->ToWorkerEx()->DecrementIOEventTaskCounter();
}
if (this->pNetReadTransaction)
{
this->pNetReadTransaction->SetCallback({});
}
}
void SocketChannelInput::OnAsyncFileOpFinished(AuUInt64 offset, AuUInt32 length)
{
OnReadTick();
}
}