AuroraRuntime/Source/IO/AuIOPipeProcessor.cpp

679 lines
20 KiB
C++
Raw Normal View History

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuIOPipeProcessor.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "AuIOProcessor.hpp"
#include "AuIOPipeProcessor.hpp"
#include <Aurora/IO/Protocol/Protocol.hpp>
namespace Aurora::IO
{
IOPipeWork::IOPipeWork(const AuSPtr<IOPipeProcessor> &parent, const IOPipeRequestAIO &request) :
parent_(parent),
request(request),
startCallback(this),
endCallback(this),
output(request.output)
{
2022-08-29 15:46:46 +00:00
this->uFrameCap_ = request.uPageLengthOrZero ? request.uPageLengthOrZero : request.kFallbackPageSize;
this->uBufferSize_ = request.uBufferLengthOrZero ? request.uBufferLengthOrZero : request.kFallbackBufferSize;
this->uBytesWrittenLimit_ = request.uLengthOrZero;
this->uBytesWrittenTarget_ = request.uMinBytesToRead ? request.uMinBytesToRead : request.uLengthOrZero;
2022-08-29 15:46:46 +00:00
this->pAsyncTransaction_ = request.pAsyncTransaction;
this->pAsyncAdapter_ = NewAsyncStreamAdapter(request.pAsyncTransaction, request.bIsStream);
SysAssert(this->pAsyncAdapter_);
this->pAsyncAdapter_->SetReadOffset(request.uStartOffset);
this->pAsyncAdapter_->SetWriteOffset(request.uStartOffset);
2022-08-29 15:46:46 +00:00
this->pAsyncStreamReader_ = this->pAsyncAdapter_->ToStreamReader();
}
IOPipeWork::IOPipeWork(const AuSPtr<IOPipeProcessor> &parent, const IOPipeRequestBasic &request) :
parent_(parent),
request(request),
startCallback(this),
endCallback(this),
output(request.output)
{
2022-08-29 15:46:46 +00:00
this->uBufferSize_ = request.uBufferLengthOrZero ? request.uBufferLengthOrZero : request.kFallbackBufferSize;
this->uFrameCap_ = request.uPageLengthOrZero ? request.uPageLengthOrZero : request.kFallbackPageSize;
this->uBytesWrittenLimit_ = request.uLengthOrZero;
this->uBytesWrittenTarget_ = request.uMinBytesToRead ? request.uMinBytesToRead : request.uLengthOrZero;
}
void IOPipeWork::Tick_FrameEpilogue()
{
if (this->bWritingAheadLowLatency)
{
this->TryPump();
}
if (AuExchange(this->bShouldReadNext, false))
{
this->ReadNext();
}
}
void IOPipeWork::Tick_Any()
{
2022-08-29 15:46:46 +00:00
if (this->pAsyncTransaction_)
{
this->AsyncPump();
}
else
{
this->StreamPump();
}
}
void IOPipeWork::OnFailureCompletion()
{
this->TerminateOnThread(true);
}
void IOPipeWork::OnNominalCompletion()
{
this->TerminateOnThread();
}
////////////////////////////////////////////////////////////
IOWorkStart::IOWorkStart(IOPipeWork *parent) : parent(parent)
{
}
void IOWorkStart::OnRun()
{
parent->RunOnThread();
}
void IOWorkStart::OnCanceled()
{
parent->TerminateOnThread();
}
////////////////////////////////////////////////////////////
IOWorkEnd::IOWorkEnd(IOPipeWork *parent) : parent(parent)
{
}
void IOWorkEnd::OnRun()
{
parent->TerminateOnThread();
}
void IOWorkEnd::OnCanceled()
{
parent->TerminateOnThread();
}
////////////////////////////////////////////////////////////
bool IOPipeWork::Start()
{
AuSPtr<IIOProcessorItem> ret;
2022-08-29 15:46:46 +00:00
if (this->pAsyncTransaction_)
{
2022-08-29 15:46:46 +00:00
auto pWaitable = this->pAsyncAdapter_->ToWaitable();
[+] Network + Protocol + TLS - Initial Commit ============================================================================= Network ]==================================================================== ============================================================================= [+] Added (very) early Aurora::IO::Net implementation [+] AuNet::EHostnameType [+] AuNet::EIPProtocol [+] AuNet::ENetworkError [+] AuNet::ETransportProtocol [+] AuNet::INetInterface [+] AuNet::INetSrvDatagram [+] AuNet::INetSrvResolve [+] AuNet::INetSrvSockets [+] AuNet::INetSrvWorkers [+] AuNet::INetWorker [+] AuNet::IPAddress [+] AuNet::IResolver [+] AuNet::ISocket [+] AuNet::IResolver [+] AuNet::ISocketBase [+] AuNet::ISocketChannel [+] AuNet::ISocketDriver [+] AuNet::ISocketDriverFactory [+] AuNet::ISocketServer [+] AuNet::ISocketServerDriver [+] AuNet::NetEndpoint [+] AuNet::NetError [+] AuNet::NetHostname (+implementation) ============================================================================= Protocol ]=================================================================== ============================================================================= [+] IProtocolInterceptor [+] IProtocolInterceptorEx [+] IProtocolStack (+implementation) ============================================================================= TLS ]======================================================================== ============================================================================= [+] ITLSContext [+] TLSProtocolRecv [+] TLSProtocolSend (+implementation) ============================================================================= IO Bug Fixes ]=============================================================== ============================================================================= [*] IOProcessor::SubmitIOWorkItem should signal the CvEvent, forcing at least once future tick (wont optimize with if in tick & not yet dispatched work items) [*] Split IOPipeWork in into IOPipeProcessor header [+] IOPipeWork::GetBuffer (internal reallocation) [*] Harden against IAsyncTransactions without a loop source [*] Missing null `if (processor->listener)` in IOProcessor [*] Solved some soft-lock conditions under Linux's LoopQueue (added deferred commits) [*] Quick hack: IOProcessor::HasItems() should OR the early can-tick check function. ============================================================================= Other ]====================================================================== ============================================================================= [+] Linux: LSSignalCatcher [+] `static void AuResetMember(Aurora::Memory::ByteBuffer &ref)` for AuROXTL [*] Attempt to enforce a normalization and don't overwrite-readptr-under-istreamwriters policy in ByteBuffer_ReadWrite (circular buffers) [*] Bad ECC ctors ============================================================================= Known issues ]=============================================================== ============================================================================= > Linux net is nowhere near done > UDP socket emulation layer isn't implemented > Ciphersuite API is a stub > Private key API is a stub > ...therefore no TLS servers > Missing thread safety precautions under net > Net implementation is still beri early
2022-08-28 19:02:06 +00:00
if (pWaitable)
{
[+] Network + Protocol + TLS - Initial Commit ============================================================================= Network ]==================================================================== ============================================================================= [+] Added (very) early Aurora::IO::Net implementation [+] AuNet::EHostnameType [+] AuNet::EIPProtocol [+] AuNet::ENetworkError [+] AuNet::ETransportProtocol [+] AuNet::INetInterface [+] AuNet::INetSrvDatagram [+] AuNet::INetSrvResolve [+] AuNet::INetSrvSockets [+] AuNet::INetSrvWorkers [+] AuNet::INetWorker [+] AuNet::IPAddress [+] AuNet::IResolver [+] AuNet::ISocket [+] AuNet::IResolver [+] AuNet::ISocketBase [+] AuNet::ISocketChannel [+] AuNet::ISocketDriver [+] AuNet::ISocketDriverFactory [+] AuNet::ISocketServer [+] AuNet::ISocketServerDriver [+] AuNet::NetEndpoint [+] AuNet::NetError [+] AuNet::NetHostname (+implementation) ============================================================================= Protocol ]=================================================================== ============================================================================= [+] IProtocolInterceptor [+] IProtocolInterceptorEx [+] IProtocolStack (+implementation) ============================================================================= TLS ]======================================================================== ============================================================================= [+] ITLSContext [+] TLSProtocolRecv [+] TLSProtocolSend (+implementation) ============================================================================= IO Bug Fixes ]=============================================================== ============================================================================= [*] IOProcessor::SubmitIOWorkItem should signal the CvEvent, forcing at least once future tick (wont optimize with if in tick & not yet dispatched work items) [*] Split IOPipeWork in into IOPipeProcessor header [+] IOPipeWork::GetBuffer (internal reallocation) [*] Harden against IAsyncTransactions without a loop source [*] Missing null `if (processor->listener)` in IOProcessor [*] Solved some soft-lock conditions under Linux's LoopQueue (added deferred commits) [*] Quick hack: IOProcessor::HasItems() should OR the early can-tick check function. ============================================================================= Other ]====================================================================== ============================================================================= [+] Linux: LSSignalCatcher [+] `static void AuResetMember(Aurora::Memory::ByteBuffer &ref)` for AuROXTL [*] Attempt to enforce a normalization and don't overwrite-readptr-under-istreamwriters policy in ByteBuffer_ReadWrite (circular buffers) [*] Bad ECC ctors ============================================================================= Known issues ]=============================================================== ============================================================================= > Linux net is nowhere near done > UDP socket emulation layer isn't implemented > Ciphersuite API is a stub > Private key API is a stub > ...therefore no TLS servers > Missing thread safety precautions under net > Net implementation is still beri early
2022-08-28 19:02:06 +00:00
ret = this->parent_->parent_->StartIOWatch(pWaitable, AuSharedFromThis());
if (!ret)
{
return false;
}
}
}
else
{
2022-08-29 15:46:46 +00:00
ret = this->parent_->parent_->StartIOWatch(this->input_.pWatchItem, AuSharedFromThis());
if (!ret)
{
return false;
}
}
2022-08-29 15:46:46 +00:00
this->pWatch = ret;
if (this->parent_->parent_->CheckThread())
{
RunOnThread();
return true;
}
return this->parent_->
parent_->SubmitIOWorkItem(AuSPtr<IIOProcessorWorkUnit>(this->SharedFromThis(),
&this->startCallback));
}
bool IOPipeWork::End()
{
if (this->parent_->parent_->CheckThread())
{
TerminateOnThread();
return true;
}
return this->parent_->
parent_->SubmitIOWorkItem(AuSPtr<IIOProcessorWorkUnit>(this->SharedFromThis(),
&this->endCallback));
}
AuInt64 IOPipeWork::GetLastTickMS()
{
return this->throughput_.GetLastFrameTimeWall();
}
AuInt64 IOPipeWork::GetStartTickMS()
{
return this->iStartTickMS_;
}
double IOPipeWork::GetPredictedThroughput()
{
return this->throughput_.GetEstimatedHertz();
}
AuUInt64 IOPipeWork::GetBytesProcessed()
{
return this->uBytesWritten_;
}
AuUInt64 IOPipeWork::GetBytesProcessedInterframe()
{
return this->uBytesWritten_ + this->bytesProcessedInterframe_;
}
void IOPipeWork::PrepareStream()
{
if (!this->buffer_.IsEmpty())
{
return;
}
2022-08-29 15:46:46 +00:00
if (!this->buffer_.Allocate(this->uBufferSize_, AuHwInfo::GetPageSize(), true))
{
SysPushErrorMem();
TerminateOnThread(true);
return;
}
}
void IOPipeWork::PrepareAsync()
{
PrepareStream();
ReadNext();
}
void IOPipeWork::AsyncPump()
{
AuMemoryViewWrite internalBuffer;
2022-08-29 15:46:46 +00:00
auto err = this->pAsyncStreamReader_->Dequeue(0, internalBuffer);
if (err != EStreamError::eErrorNone)
{
SysPushErrorIO("Async Stream Error: {}", err);
TerminateOnThread(true);
return;
}
if (internalBuffer.length == 0)
{
// end of stream
TerminateOnThread(false);
return;
}
2022-08-29 15:46:46 +00:00
err = this->pAsyncStreamReader_->Dequeue(internalBuffer.length, internalBuffer);
if (err != EStreamError::eErrorNone)
{
SysPushErrorIO("Async Stream Error: {}", err);
TerminateOnThread(true);
return;
}
this->buffer_.writePtr += internalBuffer.length;
// end of low-latency read-ahead tick
if (this->bWritingAheadLowLatency)
{
// shift if running out of linear space
auto readHead = this->buffer_.readPtr - this->buffer_.base;
if (readHead > (this->buffer_.length / 4) * 3)
{
auto readPtr = this->buffer_.base + readHead;
auto len = this->buffer_.writePtr - readPtr;
AuMemmove(this->buffer_.base, readPtr, len);
this->buffer_.writePtr = this->buffer_.base + len;
this->buffer_.readPtr = this->buffer_.base;
}
this->bWritingAheadLowLatency = false;
}
// attempt low-latency read-ahead
if (!this->bWritingAheadLowLatency &&
2023-05-11 17:31:47 +00:00
this->buffer_.CanWrite(this->uFrameCap_) /*ensure we can run ahead*/ &&
!this->IsAtRequestedEnd() /*do not preemptively terminate before the last callback is fired*/)
{
this->bWritingAheadLowLatency = true;
this->ReadNext();
// TryPump is delegated to the frame epilogue so that we can do a batched send of the next frames reads
// followed by a tick of frame[-1]
}
else
{
this->TryPump();
}
}
void IOPipeWork::StreamPump()
{
AuUInt canBuffer = this->buffer_.RemainingWrite();
canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr));
AuUInt read {};
try
{
2022-08-29 15:46:46 +00:00
if (this->input_.pReader->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer_.writePtr, canBuffer), read)) !=
AuIO::EStreamError::eErrorNone)
{
TerminateOnThread();
return;
}
}
catch (...)
{
SysPushErrorCatch();
}
this->buffer_.writePtr += read;
if (this->buffer_.writePtr == this->buffer_.base + this->buffer_.length)
{
this->buffer_.writePtr = this->buffer_.base;
}
TryPump();
}
void IOPipeWork::ReadNextAsync()
{
try
{
AuUInt canBuffer = this->buffer_.RemainingWrite();
canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr));
2022-08-29 15:46:46 +00:00
canBuffer = AuMin(AuUInt(this->uFrameCap_), canBuffer);
this->nextWriteAsync_ = AuMemoryViewWrite(this->buffer_.writePtr, canBuffer);
2022-08-29 15:46:46 +00:00
if (this->pAsyncStreamReader_->BeginRead(AuSPtr<AuMemoryViewWrite>(this->SharedFromThis(), &this->nextWriteAsync_)) !=
AuIO::EStreamError::eErrorNone)
{
if (this->bWritingAheadLowLatency)
{
this->bWritingAheadIOUOneTerminate = true;
}
else
{
TerminateOnThread(true);
}
return;
}
if (!this->nextWriteAsync_)
{
TerminateOnThread();
return;
}
}
catch (...)
{
SysPushErrorCatch();
}
}
void IOPipeWork::ReadNext()
{
if (!this->bActive)
{
return;
}
if (IsAtRequestedEnd())
{
TerminateOnThread(false);
return;
}
2022-08-29 15:46:46 +00:00
if (this->pAsyncTransaction_)
{
ReadNextAsync();
}
else
{
2022-08-29 15:46:46 +00:00
if (this->input_.pBackend)
{
2022-08-29 15:46:46 +00:00
this->input_.pBackend->OnEndPump();
}
}
}
AuUInt32 IOPipeWork::TryPump()
{
AuUInt &bytesProcessedTotal = this->bytesProcessedInterframe_;
AuUInt bytesProcessed {};
bool bIsCullingLastFrame {};
bytesProcessedTotal = 0;
do
{
AuUInt canRead2 = this->buffer_.RemainingBytes();
AuUInt canRead = canRead2;
if (!canRead)
{
break;
}
canRead = AuMin<AuUInt>(canRead, (this->buffer_.length + this->buffer_.base) - this->buffer_.readPtr);
if (!canRead)
{
continue;
}
auto oldReadHeadPtr = this->buffer_.readPtr;
auto readHead = oldReadHeadPtr - this->buffer_.base;
auto oldWriteHeadPtr = this->buffer_.writePtr;
auto writeHead = oldWriteHeadPtr - this->buffer_.base;
auto uInterframeProgress = this->GetBytesProcessedInterframe();
2023-04-25 09:09:37 +00:00
if (bIsCullingLastFrame = (this->uBytesWrittenLimit_ && canRead2 + uInterframeProgress > this->uBytesWrittenLimit_))
{
auto uLastFrameBytes = this->uBytesWrittenLimit_ - uInterframeProgress;
auto uAbsDataToRead = AuMin<AuUInt>(canRead, uLastFrameBytes);
this->buffer_.writePtr = this->buffer_.readPtr + uAbsDataToRead;
}
if (this->pProtocolStack)
{
this->pProtocolStack->DoTick();
}
try
{
if (this->output.type == EPipeCallbackType::eTryHandleBufferedPart)
{
if (this->output.handleBufferedStream.pOnData)
{
if (!this->output.handleBufferedStream.pOnData->OnDataAvailable(this->buffer_))
{
bytesProcessed = 0;
this->buffer_.readPtr = this->buffer_.base + readHead;
}
else
{
bytesProcessed = this->buffer_.readPtr - oldReadHeadPtr;
}
}
else
{
bytesProcessed = this->buffer_.readPtr - oldReadHeadPtr;
}
}
else if (this->output.type == EPipeCallbackType::eWriteToWriter)
{
2023-06-04 19:49:13 +00:00
if (this->output.forwardStream.pIntercepter &&
this->output.forwardStream.pWriter)
{
if (!this->output.forwardStream.pIntercepter->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer_.readPtr, canRead), bytesProcessed), this->output.forwardStream.pWriter))
{
bytesProcessed = 0;
}
else
{
this->buffer_.readPtr += bytesProcessed;
}
}
else
{
bytesProcessed = this->buffer_.readPtr - oldReadHeadPtr;
}
}
}
catch (...)
{
SysPushErrorCatch();
}
bytesProcessedTotal += bytesProcessed;
if (!this->bWritingAheadLowLatency)
{
if (oldWriteHeadPtr != this->buffer_.writePtr)
{
this->buffer_.writePtr = this->buffer_.base + writeHead;
}
if (bIsCullingLastFrame)
{
this->bShouldReadNext = false;
break;
}
if (this->buffer_.readPtr == this->buffer_.writePtr)
{
this->buffer_.readPtr = this->buffer_.base;
this->buffer_.writePtr = this->buffer_.base;
this->bShouldReadNext = true;
}
else if (!bytesProcessed)
{
this->bShouldReadNext = true;
}
// Prevent fucky end of allocation issues by moving the tail end of a partially buffered stream back to the start
// Should help with packing massive files, where faster disks can spin through smaller frames, leaving
// the CPU to catch up towards the end of the buffer, at which point the linearity breaks.
// We must instead force linearity, and with the assumption we can move peekable memory around, we must eventually
// move the tail end of the buffer back to the start, just so we can continue that stream view linearity.
// I really don't know how ReadNextAsync can be expected to wrap around a ring buffer.
// We'd need to know if this pass failed, and if the read head is near the end, it'd know to wrap back around to zero.
// An overengineered pain and liability.
// This should allow us to continue working in linear space without resorting to circular ring buffers
{
auto readHead = this->buffer_.readPtr - this->buffer_.base;
if (readHead > (this->buffer_.length / 4) * 3)
{
auto readPtr = this->buffer_.base + readHead;
auto len = this->buffer_.writePtr - readPtr;
AuMemmove(this->buffer_.base, readPtr, len);
this->buffer_.writePtr = this->buffer_.base + len;
this->buffer_.readPtr = this->buffer_.base;
}
}
}
if (this->output.type == EPipeCallbackType::eWriteToWriter)
{
if (this->output.forwardStream.bFlushWriter)
{
if (this->output.forwardStream.pWriter)
{
this->output.forwardStream.pWriter->Flush();
}
}
}
}
while (AuExchange(bytesProcessed, 0));
if (!this->bWritingAheadLowLatency)
{
if (!bIsCullingLastFrame)
{
if (this->buffer_.readPtr == this->buffer_.base)
{
this->bShouldReadNext = true;
}
}
}
2022-08-29 15:46:46 +00:00
this->uBytesWritten_ += bytesProcessedTotal;
this->throughput_.OnUpdate(bytesProcessedTotal);
2022-08-29 15:46:46 +00:00
if (this->request.pListener)
{
2022-08-29 15:46:46 +00:00
this->request.pListener->OnPipePartialEvent(bytesProcessedTotal);
}
if (bIsCullingLastFrame)
{
this->TerminateOnThread(false);
}
if (this->bWritingAheadIOUOneTerminate)
{
this->TerminateOnThread(true);
}
return bytesProcessedTotal;
}
[+] Network + Protocol + TLS - Initial Commit ============================================================================= Network ]==================================================================== ============================================================================= [+] Added (very) early Aurora::IO::Net implementation [+] AuNet::EHostnameType [+] AuNet::EIPProtocol [+] AuNet::ENetworkError [+] AuNet::ETransportProtocol [+] AuNet::INetInterface [+] AuNet::INetSrvDatagram [+] AuNet::INetSrvResolve [+] AuNet::INetSrvSockets [+] AuNet::INetSrvWorkers [+] AuNet::INetWorker [+] AuNet::IPAddress [+] AuNet::IResolver [+] AuNet::ISocket [+] AuNet::IResolver [+] AuNet::ISocketBase [+] AuNet::ISocketChannel [+] AuNet::ISocketDriver [+] AuNet::ISocketDriverFactory [+] AuNet::ISocketServer [+] AuNet::ISocketServerDriver [+] AuNet::NetEndpoint [+] AuNet::NetError [+] AuNet::NetHostname (+implementation) ============================================================================= Protocol ]=================================================================== ============================================================================= [+] IProtocolInterceptor [+] IProtocolInterceptorEx [+] IProtocolStack (+implementation) ============================================================================= TLS ]======================================================================== ============================================================================= [+] ITLSContext [+] TLSProtocolRecv [+] TLSProtocolSend (+implementation) ============================================================================= IO Bug Fixes ]=============================================================== ============================================================================= [*] IOProcessor::SubmitIOWorkItem should signal the CvEvent, forcing at least once future tick (wont optimize with if in tick & not yet dispatched work items) [*] Split IOPipeWork in into IOPipeProcessor header [+] IOPipeWork::GetBuffer (internal reallocation) [*] Harden against IAsyncTransactions without a loop source [*] Missing null `if (processor->listener)` in IOProcessor [*] Solved some soft-lock conditions under Linux's LoopQueue (added deferred commits) [*] Quick hack: IOProcessor::HasItems() should OR the early can-tick check function. ============================================================================= Other ]====================================================================== ============================================================================= [+] Linux: LSSignalCatcher [+] `static void AuResetMember(Aurora::Memory::ByteBuffer &ref)` for AuROXTL [*] Attempt to enforce a normalization and don't overwrite-readptr-under-istreamwriters policy in ByteBuffer_ReadWrite (circular buffers) [*] Bad ECC ctors ============================================================================= Known issues ]=============================================================== ============================================================================= > Linux net is nowhere near done > UDP socket emulation layer isn't implemented > Ciphersuite API is a stub > Private key API is a stub > ...therefore no TLS servers > Missing thread safety precautions under net > Net implementation is still beri early
2022-08-28 19:02:06 +00:00
bool IOPipeWork::IsAtRequestedEnd()
{
2022-08-29 15:46:46 +00:00
return this->uBytesWrittenLimit_ && (this->uBytesWrittenLimit_ <= this->uBytesWritten_);
[+] Network + Protocol + TLS - Initial Commit ============================================================================= Network ]==================================================================== ============================================================================= [+] Added (very) early Aurora::IO::Net implementation [+] AuNet::EHostnameType [+] AuNet::EIPProtocol [+] AuNet::ENetworkError [+] AuNet::ETransportProtocol [+] AuNet::INetInterface [+] AuNet::INetSrvDatagram [+] AuNet::INetSrvResolve [+] AuNet::INetSrvSockets [+] AuNet::INetSrvWorkers [+] AuNet::INetWorker [+] AuNet::IPAddress [+] AuNet::IResolver [+] AuNet::ISocket [+] AuNet::IResolver [+] AuNet::ISocketBase [+] AuNet::ISocketChannel [+] AuNet::ISocketDriver [+] AuNet::ISocketDriverFactory [+] AuNet::ISocketServer [+] AuNet::ISocketServerDriver [+] AuNet::NetEndpoint [+] AuNet::NetError [+] AuNet::NetHostname (+implementation) ============================================================================= Protocol ]=================================================================== ============================================================================= [+] IProtocolInterceptor [+] IProtocolInterceptorEx [+] IProtocolStack (+implementation) ============================================================================= TLS ]======================================================================== ============================================================================= [+] ITLSContext [+] TLSProtocolRecv [+] TLSProtocolSend (+implementation) ============================================================================= IO Bug Fixes ]=============================================================== ============================================================================= [*] IOProcessor::SubmitIOWorkItem should signal the CvEvent, forcing at least once future tick (wont optimize with if in tick & not yet dispatched work items) [*] Split IOPipeWork in into IOPipeProcessor header [+] IOPipeWork::GetBuffer (internal reallocation) [*] Harden against IAsyncTransactions without a loop source [*] Missing null `if (processor->listener)` in IOProcessor [*] Solved some soft-lock conditions under Linux's LoopQueue (added deferred commits) [*] Quick hack: IOProcessor::HasItems() should OR the early can-tick check function. ============================================================================= Other ]====================================================================== ============================================================================= [+] Linux: LSSignalCatcher [+] `static void AuResetMember(Aurora::Memory::ByteBuffer &ref)` for AuROXTL [*] Attempt to enforce a normalization and don't overwrite-readptr-under-istreamwriters policy in ByteBuffer_ReadWrite (circular buffers) [*] Bad ECC ctors ============================================================================= Known issues ]=============================================================== ============================================================================= > Linux net is nowhere near done > UDP socket emulation layer isn't implemented > Ciphersuite API is a stub > Private key API is a stub > ...therefore no TLS servers > Missing thread safety precautions under net > Net implementation is still beri early
2022-08-28 19:02:06 +00:00
}
AuByteBuffer *IOPipeWork::GetBuffer()
{
return &this->buffer_;
}
void IOPipeWork::RunOnThread()
{
2022-08-29 15:46:46 +00:00
if (this->input_.pBackend)
{
2022-08-29 15:46:46 +00:00
this->input_.pBackend->OnStart();
}
2022-08-29 15:46:46 +00:00
if (this->pAsyncTransaction_)
{
PrepareAsync();
}
else
{
PrepareStream();
}
}
void IOPipeWork::TerminateOnThread(bool error)
{
if (!this->bActive)
{
return;
}
this->bActive = false;
2023-06-04 19:49:13 +00:00
if (this->output.type == EPipeCallbackType::eWriteToWriter)
{
if (this->output.forwardStream.bCloseWriter)
{
if (auto pWriter = this->output.forwardStream.pWriter)
{
pWriter->Close();
}
}
}
2022-08-29 15:46:46 +00:00
if (this->pWatch)
{
2022-08-29 15:46:46 +00:00
this->pWatch->StopWatch();
}
2022-08-29 15:46:46 +00:00
if (this->request.pListener)
{
if (error)
{
// We explicitly failed...
2022-08-29 15:46:46 +00:00
this->request.pListener->OnPipeFailureEvent();
}
else if (this->uBytesWrittenTarget_ && (this->uBytesWrittenTarget_ > this->uBytesWritten_))
{
// Finished without error early
2022-08-29 15:46:46 +00:00
this->request.pListener->OnPipeFailureEvent();
}
else
{
// We finished...
2022-08-29 15:46:46 +00:00
this->request.pListener->OnPipeSuccessEvent();
}
[+] Network + Protocol + TLS - Initial Commit ============================================================================= Network ]==================================================================== ============================================================================= [+] Added (very) early Aurora::IO::Net implementation [+] AuNet::EHostnameType [+] AuNet::EIPProtocol [+] AuNet::ENetworkError [+] AuNet::ETransportProtocol [+] AuNet::INetInterface [+] AuNet::INetSrvDatagram [+] AuNet::INetSrvResolve [+] AuNet::INetSrvSockets [+] AuNet::INetSrvWorkers [+] AuNet::INetWorker [+] AuNet::IPAddress [+] AuNet::IResolver [+] AuNet::ISocket [+] AuNet::IResolver [+] AuNet::ISocketBase [+] AuNet::ISocketChannel [+] AuNet::ISocketDriver [+] AuNet::ISocketDriverFactory [+] AuNet::ISocketServer [+] AuNet::ISocketServerDriver [+] AuNet::NetEndpoint [+] AuNet::NetError [+] AuNet::NetHostname (+implementation) ============================================================================= Protocol ]=================================================================== ============================================================================= [+] IProtocolInterceptor [+] IProtocolInterceptorEx [+] IProtocolStack (+implementation) ============================================================================= TLS ]======================================================================== ============================================================================= [+] ITLSContext [+] TLSProtocolRecv [+] TLSProtocolSend (+implementation) ============================================================================= IO Bug Fixes ]=============================================================== ============================================================================= [*] IOProcessor::SubmitIOWorkItem should signal the CvEvent, forcing at least once future tick (wont optimize with if in tick & not yet dispatched work items) [*] Split IOPipeWork in into IOPipeProcessor header [+] IOPipeWork::GetBuffer (internal reallocation) [*] Harden against IAsyncTransactions without a loop source [*] Missing null `if (processor->listener)` in IOProcessor [*] Solved some soft-lock conditions under Linux's LoopQueue (added deferred commits) [*] Quick hack: IOProcessor::HasItems() should OR the early can-tick check function. ============================================================================= Other ]====================================================================== ============================================================================= [+] Linux: LSSignalCatcher [+] `static void AuResetMember(Aurora::Memory::ByteBuffer &ref)` for AuROXTL [*] Attempt to enforce a normalization and don't overwrite-readptr-under-istreamwriters policy in ByteBuffer_ReadWrite (circular buffers) [*] Bad ECC ctors ============================================================================= Known issues ]=============================================================== ============================================================================= > Linux net is nowhere near done > UDP socket emulation layer isn't implemented > Ciphersuite API is a stub > Private key API is a stub > ...therefore no TLS servers > Missing thread safety precautions under net > Net implementation is still beri early
2022-08-28 19:02:06 +00:00
2022-08-29 15:46:46 +00:00
this->request.pListener.reset();
[+] Network + Protocol + TLS - Initial Commit ============================================================================= Network ]==================================================================== ============================================================================= [+] Added (very) early Aurora::IO::Net implementation [+] AuNet::EHostnameType [+] AuNet::EIPProtocol [+] AuNet::ENetworkError [+] AuNet::ETransportProtocol [+] AuNet::INetInterface [+] AuNet::INetSrvDatagram [+] AuNet::INetSrvResolve [+] AuNet::INetSrvSockets [+] AuNet::INetSrvWorkers [+] AuNet::INetWorker [+] AuNet::IPAddress [+] AuNet::IResolver [+] AuNet::ISocket [+] AuNet::IResolver [+] AuNet::ISocketBase [+] AuNet::ISocketChannel [+] AuNet::ISocketDriver [+] AuNet::ISocketDriverFactory [+] AuNet::ISocketServer [+] AuNet::ISocketServerDriver [+] AuNet::NetEndpoint [+] AuNet::NetError [+] AuNet::NetHostname (+implementation) ============================================================================= Protocol ]=================================================================== ============================================================================= [+] IProtocolInterceptor [+] IProtocolInterceptorEx [+] IProtocolStack (+implementation) ============================================================================= TLS ]======================================================================== ============================================================================= [+] ITLSContext [+] TLSProtocolRecv [+] TLSProtocolSend (+implementation) ============================================================================= IO Bug Fixes ]=============================================================== ============================================================================= [*] IOProcessor::SubmitIOWorkItem should signal the CvEvent, forcing at least once future tick (wont optimize with if in tick & not yet dispatched work items) [*] Split IOPipeWork in into IOPipeProcessor header [+] IOPipeWork::GetBuffer (internal reallocation) [*] Harden against IAsyncTransactions without a loop source [*] Missing null `if (processor->listener)` in IOProcessor [*] Solved some soft-lock conditions under Linux's LoopQueue (added deferred commits) [*] Quick hack: IOProcessor::HasItems() should OR the early can-tick check function. ============================================================================= Other ]====================================================================== ============================================================================= [+] Linux: LSSignalCatcher [+] `static void AuResetMember(Aurora::Memory::ByteBuffer &ref)` for AuROXTL [*] Attempt to enforce a normalization and don't overwrite-readptr-under-istreamwriters policy in ByteBuffer_ReadWrite (circular buffers) [*] Bad ECC ctors ============================================================================= Known issues ]=============================================================== ============================================================================= > Linux net is nowhere near done > UDP socket emulation layer isn't implemented > Ciphersuite API is a stub > Private key API is a stub > ...therefore no TLS servers > Missing thread safety precautions under net > Net implementation is still beri early
2022-08-28 19:02:06 +00:00
}
2022-08-29 15:46:46 +00:00
this->output.handleBufferedStream.pOnData.reset();
this->output.forwardStream.pIntercepter.reset();
this->output.forwardStream.pWriter.reset();
[+] Network + Protocol + TLS - Initial Commit ============================================================================= Network ]==================================================================== ============================================================================= [+] Added (very) early Aurora::IO::Net implementation [+] AuNet::EHostnameType [+] AuNet::EIPProtocol [+] AuNet::ENetworkError [+] AuNet::ETransportProtocol [+] AuNet::INetInterface [+] AuNet::INetSrvDatagram [+] AuNet::INetSrvResolve [+] AuNet::INetSrvSockets [+] AuNet::INetSrvWorkers [+] AuNet::INetWorker [+] AuNet::IPAddress [+] AuNet::IResolver [+] AuNet::ISocket [+] AuNet::IResolver [+] AuNet::ISocketBase [+] AuNet::ISocketChannel [+] AuNet::ISocketDriver [+] AuNet::ISocketDriverFactory [+] AuNet::ISocketServer [+] AuNet::ISocketServerDriver [+] AuNet::NetEndpoint [+] AuNet::NetError [+] AuNet::NetHostname (+implementation) ============================================================================= Protocol ]=================================================================== ============================================================================= [+] IProtocolInterceptor [+] IProtocolInterceptorEx [+] IProtocolStack (+implementation) ============================================================================= TLS ]======================================================================== ============================================================================= [+] ITLSContext [+] TLSProtocolRecv [+] TLSProtocolSend (+implementation) ============================================================================= IO Bug Fixes ]=============================================================== ============================================================================= [*] IOProcessor::SubmitIOWorkItem should signal the CvEvent, forcing at least once future tick (wont optimize with if in tick & not yet dispatched work items) [*] Split IOPipeWork in into IOPipeProcessor header [+] IOPipeWork::GetBuffer (internal reallocation) [*] Harden against IAsyncTransactions without a loop source [*] Missing null `if (processor->listener)` in IOProcessor [*] Solved some soft-lock conditions under Linux's LoopQueue (added deferred commits) [*] Quick hack: IOProcessor::HasItems() should OR the early can-tick check function. ============================================================================= Other ]====================================================================== ============================================================================= [+] Linux: LSSignalCatcher [+] `static void AuResetMember(Aurora::Memory::ByteBuffer &ref)` for AuROXTL [*] Attempt to enforce a normalization and don't overwrite-readptr-under-istreamwriters policy in ByteBuffer_ReadWrite (circular buffers) [*] Bad ECC ctors ============================================================================= Known issues ]=============================================================== ============================================================================= > Linux net is nowhere near done > UDP socket emulation layer isn't implemented > Ciphersuite API is a stub > Private key API is a stub > ...therefore no TLS servers > Missing thread safety precautions under net > Net implementation is still beri early
2022-08-28 19:02:06 +00:00
2022-08-29 15:46:46 +00:00
if (this->input_.pBackend)
[+] Network + Protocol + TLS - Initial Commit ============================================================================= Network ]==================================================================== ============================================================================= [+] Added (very) early Aurora::IO::Net implementation [+] AuNet::EHostnameType [+] AuNet::EIPProtocol [+] AuNet::ENetworkError [+] AuNet::ETransportProtocol [+] AuNet::INetInterface [+] AuNet::INetSrvDatagram [+] AuNet::INetSrvResolve [+] AuNet::INetSrvSockets [+] AuNet::INetSrvWorkers [+] AuNet::INetWorker [+] AuNet::IPAddress [+] AuNet::IResolver [+] AuNet::ISocket [+] AuNet::IResolver [+] AuNet::ISocketBase [+] AuNet::ISocketChannel [+] AuNet::ISocketDriver [+] AuNet::ISocketDriverFactory [+] AuNet::ISocketServer [+] AuNet::ISocketServerDriver [+] AuNet::NetEndpoint [+] AuNet::NetError [+] AuNet::NetHostname (+implementation) ============================================================================= Protocol ]=================================================================== ============================================================================= [+] IProtocolInterceptor [+] IProtocolInterceptorEx [+] IProtocolStack (+implementation) ============================================================================= TLS ]======================================================================== ============================================================================= [+] ITLSContext [+] TLSProtocolRecv [+] TLSProtocolSend (+implementation) ============================================================================= IO Bug Fixes ]=============================================================== ============================================================================= [*] IOProcessor::SubmitIOWorkItem should signal the CvEvent, forcing at least once future tick (wont optimize with if in tick & not yet dispatched work items) [*] Split IOPipeWork in into IOPipeProcessor header [+] IOPipeWork::GetBuffer (internal reallocation) [*] Harden against IAsyncTransactions without a loop source [*] Missing null `if (processor->listener)` in IOProcessor [*] Solved some soft-lock conditions under Linux's LoopQueue (added deferred commits) [*] Quick hack: IOProcessor::HasItems() should OR the early can-tick check function. ============================================================================= Other ]====================================================================== ============================================================================= [+] Linux: LSSignalCatcher [+] `static void AuResetMember(Aurora::Memory::ByteBuffer &ref)` for AuROXTL [*] Attempt to enforce a normalization and don't overwrite-readptr-under-istreamwriters policy in ByteBuffer_ReadWrite (circular buffers) [*] Bad ECC ctors ============================================================================= Known issues ]=============================================================== ============================================================================= > Linux net is nowhere near done > UDP socket emulation layer isn't implemented > Ciphersuite API is a stub > Private key API is a stub > ...therefore no TLS servers > Missing thread safety precautions under net > Net implementation is still beri early
2022-08-28 19:02:06 +00:00
{
2022-08-29 15:46:46 +00:00
this->input_.pBackend->OnEnd();
this->input_.pBackend.reset();
}
[+] Network + Protocol + TLS - Initial Commit ============================================================================= Network ]==================================================================== ============================================================================= [+] Added (very) early Aurora::IO::Net implementation [+] AuNet::EHostnameType [+] AuNet::EIPProtocol [+] AuNet::ENetworkError [+] AuNet::ETransportProtocol [+] AuNet::INetInterface [+] AuNet::INetSrvDatagram [+] AuNet::INetSrvResolve [+] AuNet::INetSrvSockets [+] AuNet::INetSrvWorkers [+] AuNet::INetWorker [+] AuNet::IPAddress [+] AuNet::IResolver [+] AuNet::ISocket [+] AuNet::IResolver [+] AuNet::ISocketBase [+] AuNet::ISocketChannel [+] AuNet::ISocketDriver [+] AuNet::ISocketDriverFactory [+] AuNet::ISocketServer [+] AuNet::ISocketServerDriver [+] AuNet::NetEndpoint [+] AuNet::NetError [+] AuNet::NetHostname (+implementation) ============================================================================= Protocol ]=================================================================== ============================================================================= [+] IProtocolInterceptor [+] IProtocolInterceptorEx [+] IProtocolStack (+implementation) ============================================================================= TLS ]======================================================================== ============================================================================= [+] ITLSContext [+] TLSProtocolRecv [+] TLSProtocolSend (+implementation) ============================================================================= IO Bug Fixes ]=============================================================== ============================================================================= [*] IOProcessor::SubmitIOWorkItem should signal the CvEvent, forcing at least once future tick (wont optimize with if in tick & not yet dispatched work items) [*] Split IOPipeWork in into IOPipeProcessor header [+] IOPipeWork::GetBuffer (internal reallocation) [*] Harden against IAsyncTransactions without a loop source [*] Missing null `if (processor->listener)` in IOProcessor [*] Solved some soft-lock conditions under Linux's LoopQueue (added deferred commits) [*] Quick hack: IOProcessor::HasItems() should OR the early can-tick check function. ============================================================================= Other ]====================================================================== ============================================================================= [+] Linux: LSSignalCatcher [+] `static void AuResetMember(Aurora::Memory::ByteBuffer &ref)` for AuROXTL [*] Attempt to enforce a normalization and don't overwrite-readptr-under-istreamwriters policy in ByteBuffer_ReadWrite (circular buffers) [*] Bad ECC ctors ============================================================================= Known issues ]=============================================================== ============================================================================= > Linux net is nowhere near done > UDP socket emulation layer isn't implemented > Ciphersuite API is a stub > Private key API is a stub > ...therefore no TLS servers > Missing thread safety precautions under net > Net implementation is still beri early
2022-08-28 19:02:06 +00:00
2022-08-29 15:46:46 +00:00
if (auto transaction = this->pAsyncTransaction_)
[+] Network + Protocol + TLS - Initial Commit ============================================================================= Network ]==================================================================== ============================================================================= [+] Added (very) early Aurora::IO::Net implementation [+] AuNet::EHostnameType [+] AuNet::EIPProtocol [+] AuNet::ENetworkError [+] AuNet::ETransportProtocol [+] AuNet::INetInterface [+] AuNet::INetSrvDatagram [+] AuNet::INetSrvResolve [+] AuNet::INetSrvSockets [+] AuNet::INetSrvWorkers [+] AuNet::INetWorker [+] AuNet::IPAddress [+] AuNet::IResolver [+] AuNet::ISocket [+] AuNet::IResolver [+] AuNet::ISocketBase [+] AuNet::ISocketChannel [+] AuNet::ISocketDriver [+] AuNet::ISocketDriverFactory [+] AuNet::ISocketServer [+] AuNet::ISocketServerDriver [+] AuNet::NetEndpoint [+] AuNet::NetError [+] AuNet::NetHostname (+implementation) ============================================================================= Protocol ]=================================================================== ============================================================================= [+] IProtocolInterceptor [+] IProtocolInterceptorEx [+] IProtocolStack (+implementation) ============================================================================= TLS ]======================================================================== ============================================================================= [+] ITLSContext [+] TLSProtocolRecv [+] TLSProtocolSend (+implementation) ============================================================================= IO Bug Fixes ]=============================================================== ============================================================================= [*] IOProcessor::SubmitIOWorkItem should signal the CvEvent, forcing at least once future tick (wont optimize with if in tick & not yet dispatched work items) [*] Split IOPipeWork in into IOPipeProcessor header [+] IOPipeWork::GetBuffer (internal reallocation) [*] Harden against IAsyncTransactions without a loop source [*] Missing null `if (processor->listener)` in IOProcessor [*] Solved some soft-lock conditions under Linux's LoopQueue (added deferred commits) [*] Quick hack: IOProcessor::HasItems() should OR the early can-tick check function. ============================================================================= Other ]====================================================================== ============================================================================= [+] Linux: LSSignalCatcher [+] `static void AuResetMember(Aurora::Memory::ByteBuffer &ref)` for AuROXTL [*] Attempt to enforce a normalization and don't overwrite-readptr-under-istreamwriters policy in ByteBuffer_ReadWrite (circular buffers) [*] Bad ECC ctors ============================================================================= Known issues ]=============================================================== ============================================================================= > Linux net is nowhere near done > UDP socket emulation layer isn't implemented > Ciphersuite API is a stub > Private key API is a stub > ...therefore no TLS servers > Missing thread safety precautions under net > Net implementation is still beri early
2022-08-28 19:02:06 +00:00
{
transaction->Reset();
2022-08-29 15:46:46 +00:00
this->pAsyncTransaction_.reset();
[+] Network + Protocol + TLS - Initial Commit ============================================================================= Network ]==================================================================== ============================================================================= [+] Added (very) early Aurora::IO::Net implementation [+] AuNet::EHostnameType [+] AuNet::EIPProtocol [+] AuNet::ENetworkError [+] AuNet::ETransportProtocol [+] AuNet::INetInterface [+] AuNet::INetSrvDatagram [+] AuNet::INetSrvResolve [+] AuNet::INetSrvSockets [+] AuNet::INetSrvWorkers [+] AuNet::INetWorker [+] AuNet::IPAddress [+] AuNet::IResolver [+] AuNet::ISocket [+] AuNet::IResolver [+] AuNet::ISocketBase [+] AuNet::ISocketChannel [+] AuNet::ISocketDriver [+] AuNet::ISocketDriverFactory [+] AuNet::ISocketServer [+] AuNet::ISocketServerDriver [+] AuNet::NetEndpoint [+] AuNet::NetError [+] AuNet::NetHostname (+implementation) ============================================================================= Protocol ]=================================================================== ============================================================================= [+] IProtocolInterceptor [+] IProtocolInterceptorEx [+] IProtocolStack (+implementation) ============================================================================= TLS ]======================================================================== ============================================================================= [+] ITLSContext [+] TLSProtocolRecv [+] TLSProtocolSend (+implementation) ============================================================================= IO Bug Fixes ]=============================================================== ============================================================================= [*] IOProcessor::SubmitIOWorkItem should signal the CvEvent, forcing at least once future tick (wont optimize with if in tick & not yet dispatched work items) [*] Split IOPipeWork in into IOPipeProcessor header [+] IOPipeWork::GetBuffer (internal reallocation) [*] Harden against IAsyncTransactions without a loop source [*] Missing null `if (processor->listener)` in IOProcessor [*] Solved some soft-lock conditions under Linux's LoopQueue (added deferred commits) [*] Quick hack: IOProcessor::HasItems() should OR the early can-tick check function. ============================================================================= Other ]====================================================================== ============================================================================= [+] Linux: LSSignalCatcher [+] `static void AuResetMember(Aurora::Memory::ByteBuffer &ref)` for AuROXTL [*] Attempt to enforce a normalization and don't overwrite-readptr-under-istreamwriters policy in ByteBuffer_ReadWrite (circular buffers) [*] Bad ECC ctors ============================================================================= Known issues ]=============================================================== ============================================================================= > Linux net is nowhere near done > UDP socket emulation layer isn't implemented > Ciphersuite API is a stub > Private key API is a stub > ...therefore no TLS servers > Missing thread safety precautions under net > Net implementation is still beri early
2022-08-28 19:02:06 +00:00
}
2022-08-29 15:46:46 +00:00
this->pAsyncAdapter_.reset();
this->pAsyncStreamReader_.reset();
if (this->pProtocolStack)
{
this->pProtocolStack->Destroy();
this->pProtocolStack.reset();
}
this->PrivateUserDataClear();
}
IOPipeProcessor::IOPipeProcessor(IOProcessor *parent) :
parent_(parent)
{
}
AuSPtr<IIOPipeWork> IOPipeProcessor::NewBasicPipe(const IOPipeRequestBasic &request)
{
// TODO: Incomplete
return AuMakeShared<IOPipeWork>(AuStaticCast<IOPipeProcessor>(this->parent_->ToPipeProcessor()), request);
}
AuSPtr<IIOPipeWork> IOPipeProcessor::NewAIOPipe(const IOPipeRequestAIO &request)
{
// TODO: Incomplete
return AuMakeShared<IOPipeWork>(AuStaticCast<IOPipeProcessor>(this->parent_->ToPipeProcessor()), request);
}
}