[*] Experimental lower-latency IO pipes

[*] revert win32 net tx hardening - do not use the Reset function to null pSocket
[*] fix a bytebuffer circular buffer path
[*] update how TLS protocol stacks can snap the stack
This commit is contained in:
Reece Wilson 2023-05-11 16:05:00 +01:00
parent 9f59d4f921
commit 1fa433540b
15 changed files with 228 additions and 80 deletions

View File

@ -296,7 +296,7 @@ namespace Aurora::Memory
}
else
{
uCount = ((this->base + this->length) - this->readPtr) - 1;
uCount = (this->base + this->length) - this->writePtr;
pBase = this->writePtr;
}
}

View File

@ -47,6 +47,11 @@ namespace Aurora::IO
void IOPipeWork::Tick_FrameEpilogue()
{
if (this->bWritingAheadLowLatency)
{
this->TryPump();
}
if (AuExchange(this->bShouldReadNext, false))
{
this->ReadNext();
@ -238,7 +243,38 @@ namespace Aurora::IO
this->buffer_.writePtr += internalBuffer.length;
TryPump();
// end of low-latency read-ahead tick
if (this->bWritingAheadLowLatency)
{
// shift if running out of linear space
auto readHead = this->buffer_.readPtr - this->buffer_.base;
if (readHead > (this->buffer_.length / 4) * 3)
{
auto readPtr = this->buffer_.base + readHead;
auto len = this->buffer_.writePtr - readPtr;
AuMemmove(this->buffer_.base, readPtr, len);
this->buffer_.writePtr = this->buffer_.base + len;
this->buffer_.readPtr = this->buffer_.base;
}
this->bWritingAheadLowLatency = false;
}
// attempt low-latency read-ahead
if (!this->bWritingAheadLowLatency &&
this->buffer_.RemainingWrite(this->uFrameCap_) /*ensure we can run ahead*/ &&
!this->IsAtRequestedEnd() /*do not preemptively terminate before the last callback is fired*/)
{
this->bWritingAheadLowLatency = true;
this->ReadNext();
// TryPump is delegated to the frame epilogue so that we can do a batched send of the next frames reads
// followed by a tick of frame[-1]
}
else
{
this->TryPump();
}
}
void IOPipeWork::StreamPump()
@ -283,7 +319,14 @@ namespace Aurora::IO
if (this->pAsyncStreamReader_->BeginRead(AuSPtr<AuMemoryViewWrite>(this->SharedFromThis(), &this->nextWriteAsync_)) !=
AuIO::EStreamError::eErrorNone)
{
TerminateOnThread(true);
if (this->bWritingAheadLowLatency)
{
this->bWritingAheadIOUOneTerminate = true;
}
else
{
TerminateOnThread(true);
}
return;
}
@ -415,49 +458,55 @@ namespace Aurora::IO
bytesProcessedTotal += bytesProcessed;
if (oldWriteHeadPtr != this->buffer_.writePtr)
if (!this->bWritingAheadLowLatency)
{
this->buffer_.writePtr = this->buffer_.base + writeHead;
}
if (oldWriteHeadPtr != this->buffer_.writePtr)
{
this->buffer_.writePtr = this->buffer_.base + writeHead;
}
if (bIsCullingLastFrame)
{
this->bShouldReadNext = false;
break;
}
if (this->buffer_.readPtr == this->buffer_.writePtr)
{
this->buffer_.readPtr = this->buffer_.base;
this->buffer_.writePtr = this->buffer_.base;
if (bIsCullingLastFrame)
{
this->bShouldReadNext = false;
break;
}
this->bShouldReadNext = true;
}
else if (!bytesProcessed)
{
this->bShouldReadNext = true;
}
if (this->buffer_.readPtr == this->buffer_.writePtr)
{
this->buffer_.readPtr = this->buffer_.base;
this->buffer_.writePtr = this->buffer_.base;
// Prevent fucky end of allocation issues by moving the tail end of a partially buffered stream back to the start
// Should help with packing massive files, where faster disks can spin through smaller frames, leaving
// the CPU to catch up towards the end of the buffer, at which point the linearity breaks.
// We must instead force linearity, and with the assumption we can move peekable memory around, we must eventually
// move the tail end of the buffer back to the start, just so we can continue that stream view linearity.
this->bShouldReadNext = true;
}
else if (!bytesProcessed)
{
this->bShouldReadNext = true;
}
// I really don't know how ReadNextAsync can be expected to wrap around a ring buffer.
// We'd need to know if this pass failed, and if the read head is near the end, it'd know to wrap back around to zero.
// An overengineered pain and liability.
// This should work
// Prevent fucky end of allocation issues by moving the tail end of a partially buffered stream back to the start
if (readHead > (this->buffer_.length / 4) * 3)
{
auto readPtr = this->buffer_.base + readHead;
auto len = this->buffer_.writePtr - readPtr;
AuMemmove(this->buffer_.base, readPtr, len);
this->buffer_.writePtr = this->buffer_.base + len;
this->buffer_.readPtr = this->buffer_.base;
// Should help with packing massive files, where faster disks can spin through smaller frames, leaving
// the CPU to catch up towards the end of the buffer, at which point the linearity breaks.
// We must instead force linearity, and with the assumption we can move peekable memory around, we must eventually
// move the tail end of the buffer back to the start, just so we can continue that stream view linearity.
// I really don't know how ReadNextAsync can be expected to wrap around a ring buffer.
// We'd need to know if this pass failed, and if the read head is near the end, it'd know to wrap back around to zero.
// An overengineered pain and liability.
// This should allow us to continue working in linear space without resorting to circular ring buffers
{
auto readHead = this->buffer_.readPtr - this->buffer_.base;
if (readHead > (this->buffer_.length / 4) * 3)
{
auto readPtr = this->buffer_.base + readHead;
auto len = this->buffer_.writePtr - readPtr;
AuMemmove(this->buffer_.base, readPtr, len);
this->buffer_.writePtr = this->buffer_.base + len;
this->buffer_.readPtr = this->buffer_.base;
}
}
}
if (this->output.type == EPipeCallbackType::eWriteToWriter)
@ -473,11 +522,15 @@ namespace Aurora::IO
}
while (AuExchange(bytesProcessed, 0));
if (!bIsCullingLastFrame)
if (!this->bWritingAheadLowLatency)
{
if (this->buffer_.readPtr == this->buffer_.base)
if (!bIsCullingLastFrame)
{
this->bShouldReadNext = true;
if (this->buffer_.readPtr == this->buffer_.base)
{
this->bShouldReadNext = true;
}
}
}
@ -492,7 +545,12 @@ namespace Aurora::IO
if (bIsCullingLastFrame)
{
TerminateOnThread(false);
this->TerminateOnThread(false);
}
if (this->bWritingAheadIOUOneTerminate)
{
this->TerminateOnThread(true);
}
return bytesProcessedTotal;

View File

@ -110,6 +110,8 @@ namespace Aurora::IO
AuByteBuffer buffer_;
Utility::ThroughputCalculator throughput_;
AuUInt bytesProcessedInterframe_ {};
bool bWritingAheadLowLatency {};
bool bWritingAheadIOUOneTerminate {};
};

View File

@ -230,6 +230,7 @@ namespace Aurora::IO
FrameRunAlerted();
FrameRunAlertedSniffers();
FrameRunIOWorkUnits();
IO::SendBatched();
FrameEndOfFrameEvents();
return FrameFinalize();
}

View File

@ -350,7 +350,7 @@ namespace Aurora::IO::FS
SetEvent(lpOverlapped->hEvent);
auto pStupid = AuExchange(hold->pMemoryHold, {});
hold->CompleteEx(dwNumberOfBytesTransfered);
hold->CompleteEx(dwNumberOfBytesTransfered, true);
}
bool NtAsyncFileTransaction::IDontWannaUsePorts()
@ -517,7 +517,7 @@ namespace Aurora::IO::FS
return this->bHasFailed ? this->dwOsErrorCode : ERROR_SUCCESS;
}
bool NtAsyncFileTransaction::CompleteEx(AuUInt completeRoutine)
bool NtAsyncFileTransaction::CompleteEx(AuUInt completeRoutine, bool bForce)
{
DWORD read {};
@ -540,7 +540,7 @@ namespace Aurora::IO::FS
if (::GetOverlappedResult(this->pHandle_->handle,
&this->overlap,
&read,
false) && read)
false) && (read || bForce))
{
DispatchCb(read);
return true;

View File

@ -60,7 +60,7 @@ namespace Aurora::IO::FS
bool Complete() override;
bool CompleteEx(AuUInt completeRoutine);
bool CompleteEx(AuUInt completeRoutine, bool bForce = false);
bool Failed() override;
AuUInt GetOSErrorCode() override;

View File

@ -594,8 +594,11 @@ namespace Aurora::IO::Net
auto pWriteTransaction = this->socketChannel_.outputChannel.ToWriteTransaction();
#if defined(AURORA_IS_MODERNNT_DERIVED)
if (pWriteTransaction)
{
AuStaticCast<NtAsyncNetworkTransaction>(pWriteTransaction)->MakeSyncable();
AuStaticCast<NtAsyncNetworkTransaction>(pWriteTransaction)->ForceNextWriteWait();
}
#endif
this->SendOnData();
@ -604,7 +607,25 @@ namespace Aurora::IO::Net
this->socketChannel_.pSendProtocol.reset();
this->socketChannel_.inputChannel.pNetReader.reset();
this->socketChannel_.inputChannel.pNetReadTransaction->Reset();
#if defined(AURORA_IS_MODERNNT_DERIVED)
if (this->socketChannel_.inputChannel.pNetReadTransaction)
{
AuStaticCast<NtAsyncNetworkTransaction>(this->socketChannel_.inputChannel.pNetReadTransaction)->pSocket = nullptr;
}
this->socketChannel_.inputChannel.pNetReadTransaction.reset();
if (pWriteTransaction)
{
AuStaticCast<NtAsyncNetworkTransaction>(pWriteTransaction)->pSocket = nullptr;
}
this->socketChannel_.outputChannel.pParent_ = nullptr;
#else
// TODO:
#endif
this->CloseSocket();
}

View File

@ -74,9 +74,13 @@ namespace Aurora::IO::Net
{
return this->pRecvProtocol->AsStreamReader();
}
else if (this->pCachedReader)
{
return this->pCachedReader;
}
else
{
return AuMakeShared<IO::Buffered::BlobReader>(this->AsReadableByteBuffer());
return this->pCachedReader = AuMakeSharedThrow<IO::Buffered::BlobReader>(this->AsReadableByteBuffer());
}
}
@ -98,9 +102,13 @@ namespace Aurora::IO::Net
{
return this->pSendProtocol->AsStreamWriter();
}
else if (this->pCachedWriter)
{
return this->pCachedWriter;
}
else
{
return AuMakeSharedThrow<IO::Buffered::BlobWriter>(this->AsWritableByteBuffer());
return this->pCachedWriter = AuMakeSharedThrow<IO::Buffered::BlobWriter>(this->AsWritableByteBuffer());
}
}
@ -121,6 +129,7 @@ namespace Aurora::IO::Net
auto pProtocol = AuMakeShared<Protocol::ProtocolStack>();
if (!pProtocol)
{
SysPushErrorMemory();
return {};
}
@ -134,6 +143,7 @@ namespace Aurora::IO::Net
auto pBaseProtocol = Protocol::NewBufferedProtocolStack(this->uBytesOutputBuffer);
if (!pBaseProtocol)
{
SysPushErrorMemory();
return {};
}
@ -151,11 +161,13 @@ namespace Aurora::IO::Net
void SocketChannel::SpecifyRecvProtocol(const AuSPtr<Protocol::IProtocolStack> &pRecvProtocol)
{
this->pRecvProtocol = pRecvProtocol;
this->pCachedReader.reset();
}
void SocketChannel::SpecifySendProtocol(const AuSPtr<Protocol::IProtocolStack> &pSendProtocol)
{
this->pSendProtocol = pSendProtocol;
this->pCachedWriter.reset();
}
AuSPtr<ISocketStats> SocketChannel::GetRecvStats()

View File

@ -108,6 +108,9 @@ namespace Aurora::IO::Net
AuSPtr<Protocol::IProtocolStack> pRecvProtocol;
AuSPtr<Protocol::IProtocolStack> pSendProtocol;
AuSPtr<IO::Buffered::BlobReader> pCachedReader;
AuSPtr<IO::Buffered::BlobWriter> pCachedWriter;
AuThreadPrimitives::SpinLock spinLock;
AuList<AuSPtr<ISocketChannelEventListener>> eventListeners;

View File

@ -93,18 +93,27 @@ namespace Aurora::IO::Net
void SocketChannelInput::OnEstablish()
{
auto sharedThis = AuSPtr<SocketChannelInput>(this->pParent_->SharedFromThis(), this);
if (!this->pNetReader)
auto pReader = this->pNetReader;
if (!pReader)
{
return;
}
if (!this->pNetReader->Start())
if (!pReader->Start())
{
this->pParent_->SendErrorNoStream({});
return;
}
this->pNetReadTransaction->SetCallback(sharedThis);//; AuSPtr<IAsyncFinishedSubscriber>(this->pParent_->SharedFromThis(), this));
auto pTX = this->pNetReadTransaction;
if (!pTX)
{
this->pParent_->SendErrorNoStream({});
return;
}
pTX->SetCallback(sharedThis);//; AuSPtr<IAsyncFinishedSubscriber>(this->pParent_->SharedFromThis(), this));
IncrementWorker();
}

View File

@ -42,6 +42,11 @@ namespace Aurora::IO::Net
AuSPtr<Memory::ByteBuffer> SocketChannelOutput::AsWritableByteBuffer()
{
if (!this->pParent_)
{
return {};
}
return AuSPtr<Memory::ByteBuffer>(this->pParent_->SharedFromThis(),
&this->outputBuffer_);
}
@ -54,6 +59,11 @@ namespace Aurora::IO::Net
void SocketChannelOutput::SchedWriteTick()
{
if (!this->pParent_)
{
return;
}
auto pWorker = this->pParent_->ToWorkerEx();
if (this->pParent_->bHasFinalized_)
@ -84,6 +94,12 @@ namespace Aurora::IO::Net
void SocketChannelOutput::SendIfData()
{
AU_LOCK_GUARD(this->lock_);
if (!this->pParent_)
{
return;
}
struct View : AuMemoryViewRead
{
View(const AuMemoryViewRead &in) : AuMemoryViewRead(in)
@ -150,7 +166,10 @@ namespace Aurora::IO::Net
if (bShouldShutdown)
{
this->pParent_->Shutdown(true);
if (this->pParent_)
{
this->pParent_->Shutdown(true);
}
}
}
@ -185,21 +204,24 @@ namespace Aurora::IO::Net
if (auto pFrameToSend = this->outputWriteQueue_.Dequeue())
{
this->pNetWriteTransaction_->SetCallback(AuSPtr<IAsyncFinishedSubscriber>(this->pParent_->SharedFromThis(), this));
if (!this->pParent_->ToWorkerEx()->IncrementIOEventTaskCounter())
if (this->pParent_)
{
SysPushErrorIO("Couldn't begin wait");
this->pParent_->SendErrorBeginShutdown({});
return false;
}
this->pNetWriteTransaction_->SetCallback(AuSPtr<IAsyncFinishedSubscriber>(this->pParent_->SharedFromThis(), this));
if (!this->pNetWriteTransaction_->StartWrite(0, pFrameToSend))
{
this->pParent_->ToWorkerEx()->DecrementIOEventTaskCounter();
SysPushErrorIO("Couldn't dispatch the to-send frame");
this->pParent_->SendErrorBeginShutdown({});
return false;
if (!this->pParent_->ToWorkerEx()->IncrementIOEventTaskCounter())
{
SysPushErrorIO("Couldn't begin wait");
this->pParent_->SendErrorBeginShutdown({});
return false;
}
if (!this->pNetWriteTransaction_->StartWrite(0, pFrameToSend))
{
this->pParent_->ToWorkerEx()->DecrementIOEventTaskCounter();
SysPushErrorIO("Couldn't dispatch the to-send frame");
this->pParent_->SendErrorBeginShutdown({});
return false;
}
}
}
else
@ -232,6 +254,11 @@ namespace Aurora::IO::Net
void SocketChannelOutput::OnAsyncFileOpFinished(AuUInt64 offset, AuUInt32 length)
{
if (!this->pParent_)
{
return;
}
this->pParent_->ToWorkerEx()->DecrementIOEventTaskCounter();
AuStaticCast<SocketChannel>(this->pParent_->ToChannel())->GetSendStatsEx().AddBytes(length);

View File

@ -49,7 +49,7 @@ namespace Aurora::IO::Net
SetEvent(lpOverlapped->hEvent);
auto pHold = AuExchange(hold->pMemoryHold, {});
hold->CompleteEx(cbTransferred);
hold->CompleteEx(cbTransferred, true);
}
bool NtAsyncNetworkTransaction::StartRead(AuUInt64 offset, const AuSPtr<AuMemoryViewWrite> &memoryView)
@ -292,7 +292,7 @@ namespace Aurora::IO::Net
return CompleteEx(0);
}
bool NtAsyncNetworkTransaction::CompleteEx(AuUInt completeRoutine)
bool NtAsyncNetworkTransaction::CompleteEx(AuUInt completeRoutine, bool bForce)
{
DWORD read {};
@ -305,10 +305,19 @@ namespace Aurora::IO::Net
if (!this->pMemoryHold)
{
if (bForce)
{
if (!AuExchange(this->bSendEOSOnce, true))
{
DispatchCb(0);
return true;
}
}
return false;
}
if (::GetOverlappedResult((HANDLE)this->GetSocket(), &this->overlap, &read, false) && read)
if (::GetOverlappedResult((HANDLE)this->GetSocket(), &this->overlap, &read, false) && (read || bForce))
{
DispatchCb(read);
return true;
@ -393,7 +402,6 @@ namespace Aurora::IO::Net
this->dwLastBytes = 0;
this->dwLastAbstractStat = 0;
this->pSocket = nullptr;
}
void NtAsyncNetworkTransaction::MakeSyncable()

View File

@ -26,7 +26,7 @@ namespace Aurora::IO::Net
bool Complete() override;
bool CompleteEx(AuUInt completeRoutine);
bool CompleteEx(AuUInt completeRoutine, bool bForce = false);
bool Failed() override;
bool HasErrorCode();

View File

@ -33,11 +33,17 @@ namespace Aurora::IO::TLS
return true;
}
if (!this->DoOneTick(pWriteOutByteBuffer))
do
{
this->pReadInByteBuffer.reset();
return true;
this->bHasRead = false;
if (!this->DoOneTick(pWriteOutByteBuffer))
{
this->pReadInByteBuffer.reset();
return false;
}
}
while (this->bHasRead);
this->pReadInByteBuffer.reset();
@ -133,7 +139,8 @@ namespace Aurora::IO::TLS
// ...
if (!uCount)
{
return true;
SysPushErrorCrypto("A TLS tick failed because there was not enough space in the output buffer");
return false;
}
// mbedtls tick
@ -153,7 +160,7 @@ namespace Aurora::IO::TLS
{
this->pParent_->OnClose();
// mbedtls doesn't know about peeking. their os doesn't support it. wont be added for linux+nt.
return true;
return false;
}
if (iRet < 0)

View File

@ -20,12 +20,12 @@ namespace Aurora::IO::TLS
{
if (!this->pParent_->bIsAlive)
{
return false;
return true;
}
if (pReadInByteBuffer->writePtr == pReadInByteBuffer->readPtr)
{
return false;
return true;
}
while (pReadInByteBuffer->writePtr != pReadInByteBuffer->readPtr)