210 lines
6.2 KiB
C++
210 lines
6.2 KiB
C++
|
/***
|
||
|
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||
|
|
||
|
File: AuNetSocketChannelOutput.cpp
|
||
|
Date: 2022-8-21
|
||
|
Author: Reece
|
||
|
***/
|
||
|
#include "Networking.hpp"
|
||
|
#include "AuNetSocket.hpp"
|
||
|
#include "AuNetSocketChannelOutput.hpp"
|
||
|
#include "AuNetWorker.hpp"
|
||
|
#include <Source/IO/IOPipeProcessor.hpp>
|
||
|
|
||
|
#if defined(AURORA_IS_MODERNNT_DERIVED)
|
||
|
#include "AuNetStream.NT.hpp"
|
||
|
#else
|
||
|
#include "AuNetStream.Linux.hpp"
|
||
|
#endif
|
||
|
|
||
|
static const auto kDefaultBufferSize = 64 * 1024;
|
||
|
|
||
|
namespace Aurora::IO::Net
|
||
|
{
|
||
|
SocketChannelOutput::SocketChannelOutput(SocketBase *pParent, const AuSPtr<IAsyncTransaction> &stream) :
|
||
|
pParent_(pParent),
|
||
|
pNetWriteTransaction_(stream),
|
||
|
outputBuffer_(kDefaultBufferSize, true)
|
||
|
{
|
||
|
}
|
||
|
|
||
|
bool SocketChannelOutput::IsValid()
|
||
|
{
|
||
|
return bool(this->pNetWriteTransaction_) &&
|
||
|
bool(this->outputBuffer_);
|
||
|
}
|
||
|
|
||
|
AuSPtr<IAsyncTransaction> SocketChannelOutput::ToWriteTransaction()
|
||
|
{
|
||
|
return AuSPtr<IAsyncTransaction>(this->pNetWriteTransaction_);
|
||
|
}
|
||
|
|
||
|
AuSPtr<Memory::ByteBuffer> SocketChannelOutput::AsWritableByteBuffer()
|
||
|
{
|
||
|
return AuSPtr<Memory::ByteBuffer>(this->pParent_->SharedFromThis(),
|
||
|
&this->outputBuffer_);
|
||
|
}
|
||
|
|
||
|
void SocketChannelOutput::ScheduleOutOfFrameWrite()
|
||
|
{
|
||
|
SendIfData();
|
||
|
SchedWriteTick();
|
||
|
}
|
||
|
|
||
|
void SocketChannelOutput::SchedWriteTick()
|
||
|
{
|
||
|
auto pWorker = this->pParent_->ToWorkerEx();
|
||
|
|
||
|
if (this->pParent_->bHasFinalized_)
|
||
|
{
|
||
|
if (pWorker->IsOnThread())
|
||
|
{
|
||
|
WriteTick();
|
||
|
}
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
auto meShared = AuSPtr<SocketChannelOutput>(this->pParent_->SharedFromThis(), this);
|
||
|
|
||
|
if (!pWorker->TryScheduleInternalTemplate<AuNullS>([=](const AuSPtr<AuAsync::PromiseCallback<AuNullS>> &info)
|
||
|
{
|
||
|
meShared->WriteTick();
|
||
|
|
||
|
info->OnSuccess((void *)nullptr);
|
||
|
}, AuStaticCast<AuAsync::PromiseCallback<AuNullS, AuNullS>>(AuMakeShared<AuAsync::PromiseCallbackFunctional<AuNullS, AuNullS>>([=](const AuSPtr<AuNullS> &dumb)
|
||
|
{
|
||
|
}))))
|
||
|
{
|
||
|
SysPushErrorIO("Couldn't schedule write tick");
|
||
|
this->pParent_->SendErrorBeginShutdown({});
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void SocketChannelOutput::SendIfData()
|
||
|
{
|
||
|
AU_LOCK_GUARD(this->lock_);
|
||
|
struct View : AuMemoryViewRead
|
||
|
{
|
||
|
View(const AuMemoryViewRead &in) : AuMemoryViewRead(in)
|
||
|
{
|
||
|
}
|
||
|
AuSPtr<void> pin;
|
||
|
};
|
||
|
|
||
|
while (this->pOutSendPointer_ != this->outputBuffer_.writePtr)
|
||
|
{
|
||
|
if (this->pOutSendPointer_ == this->outputBuffer_.writePtr)
|
||
|
{
|
||
|
break;
|
||
|
}
|
||
|
|
||
|
if (this->pOutSendPointer_ == this->outputBuffer_.base + this->outputBuffer_.length)
|
||
|
{
|
||
|
this->pOutSendPointer_ = this->outputBuffer_.base;
|
||
|
continue;
|
||
|
}
|
||
|
|
||
|
if (!this->pOutSendPointer_)
|
||
|
{
|
||
|
this->pOutSendPointer_ = this->outputBuffer_.base;
|
||
|
continue;
|
||
|
}
|
||
|
|
||
|
if ((AuUInt8 *)this->pOutSendPointer_ > this->outputBuffer_.writePtr)
|
||
|
{
|
||
|
auto pBase = (AuUInt8 *)this->pOutSendPointer_;
|
||
|
auto uLen = (AuUInt8 *)this->outputBuffer_.base + this->outputBuffer_.length - pBase;
|
||
|
|
||
|
auto pView = AuMakeShared<View>(AuMemoryViewRead(pBase, uLen));
|
||
|
SysAssert(pView);
|
||
|
pView->pin = this->pParent_->SharedFromThis();
|
||
|
|
||
|
this->outputWriteQueue_.Push(pView);
|
||
|
|
||
|
this->pOutSendPointer_ = (AuUInt8 *)this->pOutSendPointer_ + uLen;
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
auto pBase = (AuUInt8 *)this->pOutSendPointer_;
|
||
|
auto pWriteHead = (AuUInt8 *)this->outputBuffer_.writePtr;
|
||
|
auto uLen = pWriteHead - pBase;
|
||
|
|
||
|
auto pView = AuMakeShared<View>(AuMemoryViewRead(pBase, uLen));
|
||
|
SysAssert(pView);
|
||
|
pView->pin = this->pParent_->SharedFromThis();
|
||
|
|
||
|
this->outputWriteQueue_.Push(pView);
|
||
|
this->pOutSendPointer_ = pWriteHead ;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void SocketChannelOutput::WriteTick()
|
||
|
{
|
||
|
AU_LOCK_GUARD(this->lock_);
|
||
|
|
||
|
#if defined(AURORA_IS_MODERNNT_DERIVED)
|
||
|
auto pHackTransaction =
|
||
|
AuStaticCast<NtAsyncNetworkTransaction>(this->pNetWriteTransaction_);
|
||
|
#else
|
||
|
auto pHackTransaction =
|
||
|
AuStaticCast<LinuxAsyncNetworkTransaction>(this->pNetWriteTransaction_);
|
||
|
#endif
|
||
|
|
||
|
if (pHackTransaction->bIsWriting)
|
||
|
{
|
||
|
// IsComplete?
|
||
|
if (!pHackTransaction->bLatch)
|
||
|
{
|
||
|
return;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (auto pFrameToSend = this->outputWriteQueue_.Dequeue())
|
||
|
{
|
||
|
this->pNetWriteTransaction_->SetCallback(AuSPtr<IAsyncFinishedSubscriber>(this->pParent_->SharedFromThis(), this));
|
||
|
|
||
|
if (!this->pParent_->ToWorkerEx()->IncrementIOEventTaskCounter())
|
||
|
{
|
||
|
SysPushErrorIO("Couldn't begin wait");
|
||
|
this->pParent_->SendErrorBeginShutdown({});
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
if (!this->pNetWriteTransaction_->StartWrite(0, pFrameToSend))
|
||
|
{
|
||
|
this->pParent_->ToWorkerEx()->DecrementIOEventTaskCounter();
|
||
|
SysPushErrorIO("Couldn't dispatch the to-send frame");
|
||
|
this->pParent_->SendErrorBeginShutdown({});
|
||
|
return;
|
||
|
}
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
this->pNetWriteTransaction_->SetCallback({});
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void SocketChannelOutput::OnEndOfReadTick()
|
||
|
{
|
||
|
SendIfData();
|
||
|
WriteTick();
|
||
|
}
|
||
|
|
||
|
void SocketChannelOutput::OnAsyncFileOpFinished(AuUInt64 offset, AuUInt32 length)
|
||
|
{
|
||
|
this->pParent_->ToWorkerEx()->DecrementIOEventTaskCounter();
|
||
|
|
||
|
if (!length)
|
||
|
{
|
||
|
this->pParent_->SendErrorBeginShutdown({});
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
this->outputWriteQueue_.NotifyBytesWritten(length); // does the resend bit
|
||
|
|
||
|
SysAssert(this->outputBuffer_.ReaderTryGoForward(length));
|
||
|
|
||
|
WriteTick();
|
||
|
}
|
||
|
}
|