Reece Wilson
67905a4192
============================================================================= Network ]==================================================================== ============================================================================= [+] Added (very) early Aurora::IO::Net implementation [+] AuNet::EHostnameType [+] AuNet::EIPProtocol [+] AuNet::ENetworkError [+] AuNet::ETransportProtocol [+] AuNet::INetInterface [+] AuNet::INetSrvDatagram [+] AuNet::INetSrvResolve [+] AuNet::INetSrvSockets [+] AuNet::INetSrvWorkers [+] AuNet::INetWorker [+] AuNet::IPAddress [+] AuNet::IResolver [+] AuNet::ISocket [+] AuNet::IResolver [+] AuNet::ISocketBase [+] AuNet::ISocketChannel [+] AuNet::ISocketDriver [+] AuNet::ISocketDriverFactory [+] AuNet::ISocketServer [+] AuNet::ISocketServerDriver [+] AuNet::NetEndpoint [+] AuNet::NetError [+] AuNet::NetHostname (+implementation) ============================================================================= Protocol ]=================================================================== ============================================================================= [+] IProtocolInterceptor [+] IProtocolInterceptorEx [+] IProtocolStack (+implementation) ============================================================================= TLS ]======================================================================== ============================================================================= [+] ITLSContext [+] TLSProtocolRecv [+] TLSProtocolSend (+implementation) ============================================================================= IO Bug Fixes ]=============================================================== ============================================================================= [*] IOProcessor::SubmitIOWorkItem should signal the CvEvent, forcing at least once future tick (wont optimize with if in tick & not yet dispatched work items) [*] Split IOPipeWork in into IOPipeProcessor header [+] IOPipeWork::GetBuffer (internal reallocation) [*] Harden against IAsyncTransactions without a loop source [*] Missing null `if (processor->listener)` in IOProcessor [*] Solved some soft-lock conditions under Linux's LoopQueue (added deferred commits) [*] Quick hack: IOProcessor::HasItems() should OR the early can-tick check function. ============================================================================= Other ]====================================================================== ============================================================================= [+] Linux: LSSignalCatcher [+] `static void AuResetMember(Aurora::Memory::ByteBuffer &ref)` for AuROXTL [*] Attempt to enforce a normalization and don't overwrite-readptr-under-istreamwriters policy in ByteBuffer_ReadWrite (circular buffers) [*] Bad ECC ctors ============================================================================= Known issues ]=============================================================== ============================================================================= > Linux net is nowhere near done > UDP socket emulation layer isn't implemented > Ciphersuite API is a stub > Private key API is a stub > ...therefore no TLS servers > Missing thread safety precautions under net > Net implementation is still beri early
210 lines
6.2 KiB
C++
210 lines
6.2 KiB
C++
/***
|
|
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
File: AuNetSocketChannelOutput.cpp
|
|
Date: 2022-8-21
|
|
Author: Reece
|
|
***/
|
|
#include "Networking.hpp"
|
|
#include "AuNetSocket.hpp"
|
|
#include "AuNetSocketChannelOutput.hpp"
|
|
#include "AuNetWorker.hpp"
|
|
#include <Source/IO/IOPipeProcessor.hpp>
|
|
|
|
#if defined(AURORA_IS_MODERNNT_DERIVED)
|
|
#include "AuNetStream.NT.hpp"
|
|
#else
|
|
#include "AuNetStream.Linux.hpp"
|
|
#endif
|
|
|
|
static const auto kDefaultBufferSize = 64 * 1024;
|
|
|
|
namespace Aurora::IO::Net
|
|
{
|
|
SocketChannelOutput::SocketChannelOutput(SocketBase *pParent, const AuSPtr<IAsyncTransaction> &stream) :
|
|
pParent_(pParent),
|
|
pNetWriteTransaction_(stream),
|
|
outputBuffer_(kDefaultBufferSize, true)
|
|
{
|
|
}
|
|
|
|
bool SocketChannelOutput::IsValid()
|
|
{
|
|
return bool(this->pNetWriteTransaction_) &&
|
|
bool(this->outputBuffer_);
|
|
}
|
|
|
|
AuSPtr<IAsyncTransaction> SocketChannelOutput::ToWriteTransaction()
|
|
{
|
|
return AuSPtr<IAsyncTransaction>(this->pNetWriteTransaction_);
|
|
}
|
|
|
|
AuSPtr<Memory::ByteBuffer> SocketChannelOutput::AsWritableByteBuffer()
|
|
{
|
|
return AuSPtr<Memory::ByteBuffer>(this->pParent_->SharedFromThis(),
|
|
&this->outputBuffer_);
|
|
}
|
|
|
|
void SocketChannelOutput::ScheduleOutOfFrameWrite()
|
|
{
|
|
SendIfData();
|
|
SchedWriteTick();
|
|
}
|
|
|
|
void SocketChannelOutput::SchedWriteTick()
|
|
{
|
|
auto pWorker = this->pParent_->ToWorkerEx();
|
|
|
|
if (this->pParent_->bHasFinalized_)
|
|
{
|
|
if (pWorker->IsOnThread())
|
|
{
|
|
WriteTick();
|
|
}
|
|
return;
|
|
}
|
|
|
|
auto meShared = AuSPtr<SocketChannelOutput>(this->pParent_->SharedFromThis(), this);
|
|
|
|
if (!pWorker->TryScheduleInternalTemplate<AuNullS>([=](const AuSPtr<AuAsync::PromiseCallback<AuNullS>> &info)
|
|
{
|
|
meShared->WriteTick();
|
|
|
|
info->OnSuccess((void *)nullptr);
|
|
}, AuStaticCast<AuAsync::PromiseCallback<AuNullS, AuNullS>>(AuMakeShared<AuAsync::PromiseCallbackFunctional<AuNullS, AuNullS>>([=](const AuSPtr<AuNullS> &dumb)
|
|
{
|
|
}))))
|
|
{
|
|
SysPushErrorIO("Couldn't schedule write tick");
|
|
this->pParent_->SendErrorBeginShutdown({});
|
|
}
|
|
}
|
|
|
|
void SocketChannelOutput::SendIfData()
|
|
{
|
|
AU_LOCK_GUARD(this->lock_);
|
|
struct View : AuMemoryViewRead
|
|
{
|
|
View(const AuMemoryViewRead &in) : AuMemoryViewRead(in)
|
|
{
|
|
}
|
|
AuSPtr<void> pin;
|
|
};
|
|
|
|
while (this->pOutSendPointer_ != this->outputBuffer_.writePtr)
|
|
{
|
|
if (this->pOutSendPointer_ == this->outputBuffer_.writePtr)
|
|
{
|
|
break;
|
|
}
|
|
|
|
if (this->pOutSendPointer_ == this->outputBuffer_.base + this->outputBuffer_.length)
|
|
{
|
|
this->pOutSendPointer_ = this->outputBuffer_.base;
|
|
continue;
|
|
}
|
|
|
|
if (!this->pOutSendPointer_)
|
|
{
|
|
this->pOutSendPointer_ = this->outputBuffer_.base;
|
|
continue;
|
|
}
|
|
|
|
if ((AuUInt8 *)this->pOutSendPointer_ > this->outputBuffer_.writePtr)
|
|
{
|
|
auto pBase = (AuUInt8 *)this->pOutSendPointer_;
|
|
auto uLen = (AuUInt8 *)this->outputBuffer_.base + this->outputBuffer_.length - pBase;
|
|
|
|
auto pView = AuMakeShared<View>(AuMemoryViewRead(pBase, uLen));
|
|
SysAssert(pView);
|
|
pView->pin = this->pParent_->SharedFromThis();
|
|
|
|
this->outputWriteQueue_.Push(pView);
|
|
|
|
this->pOutSendPointer_ = (AuUInt8 *)this->pOutSendPointer_ + uLen;
|
|
}
|
|
else
|
|
{
|
|
auto pBase = (AuUInt8 *)this->pOutSendPointer_;
|
|
auto pWriteHead = (AuUInt8 *)this->outputBuffer_.writePtr;
|
|
auto uLen = pWriteHead - pBase;
|
|
|
|
auto pView = AuMakeShared<View>(AuMemoryViewRead(pBase, uLen));
|
|
SysAssert(pView);
|
|
pView->pin = this->pParent_->SharedFromThis();
|
|
|
|
this->outputWriteQueue_.Push(pView);
|
|
this->pOutSendPointer_ = pWriteHead ;
|
|
}
|
|
}
|
|
}
|
|
|
|
void SocketChannelOutput::WriteTick()
|
|
{
|
|
AU_LOCK_GUARD(this->lock_);
|
|
|
|
#if defined(AURORA_IS_MODERNNT_DERIVED)
|
|
auto pHackTransaction =
|
|
AuStaticCast<NtAsyncNetworkTransaction>(this->pNetWriteTransaction_);
|
|
#else
|
|
auto pHackTransaction =
|
|
AuStaticCast<LinuxAsyncNetworkTransaction>(this->pNetWriteTransaction_);
|
|
#endif
|
|
|
|
if (pHackTransaction->bIsWriting)
|
|
{
|
|
// IsComplete?
|
|
if (!pHackTransaction->bLatch)
|
|
{
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (auto pFrameToSend = this->outputWriteQueue_.Dequeue())
|
|
{
|
|
this->pNetWriteTransaction_->SetCallback(AuSPtr<IAsyncFinishedSubscriber>(this->pParent_->SharedFromThis(), this));
|
|
|
|
if (!this->pParent_->ToWorkerEx()->IncrementIOEventTaskCounter())
|
|
{
|
|
SysPushErrorIO("Couldn't begin wait");
|
|
this->pParent_->SendErrorBeginShutdown({});
|
|
return;
|
|
}
|
|
|
|
if (!this->pNetWriteTransaction_->StartWrite(0, pFrameToSend))
|
|
{
|
|
this->pParent_->ToWorkerEx()->DecrementIOEventTaskCounter();
|
|
SysPushErrorIO("Couldn't dispatch the to-send frame");
|
|
this->pParent_->SendErrorBeginShutdown({});
|
|
return;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
this->pNetWriteTransaction_->SetCallback({});
|
|
}
|
|
}
|
|
|
|
void SocketChannelOutput::OnEndOfReadTick()
|
|
{
|
|
SendIfData();
|
|
WriteTick();
|
|
}
|
|
|
|
void SocketChannelOutput::OnAsyncFileOpFinished(AuUInt64 offset, AuUInt32 length)
|
|
{
|
|
this->pParent_->ToWorkerEx()->DecrementIOEventTaskCounter();
|
|
|
|
if (!length)
|
|
{
|
|
this->pParent_->SendErrorBeginShutdown({});
|
|
return;
|
|
}
|
|
|
|
this->outputWriteQueue_.NotifyBytesWritten(length); // does the resend bit
|
|
|
|
SysAssert(this->outputBuffer_.ReaderTryGoForward(length));
|
|
|
|
WriteTick();
|
|
}
|
|
} |