AuroraRuntime/Source/IO/IOAdapterAsyncStream.cpp

696 lines
18 KiB
C++
Raw Normal View History

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOAdapterAsyncStream.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "IOAdapterAsyncStream.hpp"
#include "IOWaitableIOLoopSource.hpp"
namespace Aurora::IO
{
2022-06-29 13:56:59 +00:00
struct AsyncStreamAdapter;
struct AsyncStreamReader : IAsyncStreamReader
{
2022-06-29 13:56:59 +00:00
AsyncStreamAdapter *parent;
EStreamError IsOpen() override;
EStreamError Read(const Memory::MemoryViewStreamWrite &parameters) override;
void Close() override;
EStreamError BeginRead(const AuSPtr<Memory::MemoryViewWrite> &internalView) override;
EStreamError Dequeue(AuUInt reqLength, Memory::MemoryViewWrite &out) override;
};
struct AsyncStreamMemory : AuMemoryViewWrite
{
AsyncStreamMemory(const AuMemoryViewWrite &ref);
AsyncStreamMemory(const AuSPtr<AuMemoryViewWrite> &ref);
AsyncStreamMemory(AuUInt length);
~AsyncStreamMemory();
bool IsValid();
bool owned {};
AuSPtr<AuMemoryViewWrite> pin;
AuUInt streamIndex {};
// todo shared
};
struct AsyncStreamWriter : IStreamWriter
{
~AsyncStreamWriter();
2022-06-29 13:56:59 +00:00
AsyncStreamAdapter *parent;
EStreamError IsOpen() override;
EStreamError Write(const Memory::MemoryViewStreamRead &parameters) override;
void Close() override;
void Flush() override;
void Preframe();
void Frame();
AuList<AuSPtr<AsyncStreamMemory>> writesPending;
bool HasWorkItems();
bool used {};
};
2022-06-29 13:56:59 +00:00
struct AsyncStreamAdapter : IAsyncStreamAdapter, AuEnableSharedFromThis<AsyncStreamAdapter>
{
2022-06-29 13:56:59 +00:00
AsyncStreamAdapter();
AuSPtr<AsyncStreamMemory> AllocateNextPageCached(AuUInt length);
virtual AuSPtr<IAsyncStreamReader> ToStreamReader() override;
virtual AuSPtr<IStreamWriter> ToStreamWriter() override;
virtual AuSPtr<IIOWaitableItem> ToWaitable() override;
virtual bool Reset() override;
bool Init(const AuSPtr<IAsyncTransaction> &transaction, bool isStream);
AuSPtr<AsyncStreamMemory> lastAllocation;
AuSPtr<IAsyncTransaction> transaction;
bool SetFlushOnWrite(bool value) override;
void ReserveBuffer(AuUInt length) override;
Further Linux support [+] Begin work on IO futexes for io release on process/thread exit [+] Linux ::readdir iteration [+] AuConsole buffering API [*] Fix sleep as to not get interrupted by signals [*] Switch the type of FS lock used under Linux [*] Linux: Use new IPCHandle encoding scheme [*] Fix undefined behaviour: unintialized timeout values (AuLoop/Linux) [*] Fix undefined behaviour: ConsoleTTY clear line was called of a color of a random value on stack [-] Remainings of std dir iterator [*] Fix pthread_kill (aka send signal to pthread handle) always kills process. This is what you expect bc signal handler inheritance. [*] Reformat the build Aurora.json file [+] Added clang warning ignores to the build file [*] Fix: UNIX need to use STDOUT_FILENO. Was using CRT handle in place of fd by mistake. [+] Linux implementation for IO yield (AuIO::IOYield() - UNIX::LinuxOverlappedYield()) [*] Fix: Linux async end of stream processing. res 0 = zero bytes consumed. <= was detecting this as an error of code 0. Should succeed with zero bytes. [+] Linux LoopQueue missing epilogue hook for the IO processor [*] Various refactors and minor bug fixes [*] Linux fix: Handle pipe EOS as zero [*] Linux fix: thread termination via a user signal of 77. Need a force terminate. [*] IPC handle: fix improper int to bool cast in the header setup within ToString [*] Linux fix: HWInfo CPU topology regression [-] Linux fix: remove SIGABRT handler [*] Missing override in compression, exit, and consoletty headers. [+] Unix Syslog logger backend
2022-08-02 04:52:17 +00:00
AuUInt GetReadOffset() override;
AuUInt SetReadOffset(AuUInt offset) override;
Further Linux support [+] Begin work on IO futexes for io release on process/thread exit [+] Linux ::readdir iteration [+] AuConsole buffering API [*] Fix sleep as to not get interrupted by signals [*] Switch the type of FS lock used under Linux [*] Linux: Use new IPCHandle encoding scheme [*] Fix undefined behaviour: unintialized timeout values (AuLoop/Linux) [*] Fix undefined behaviour: ConsoleTTY clear line was called of a color of a random value on stack [-] Remainings of std dir iterator [*] Fix pthread_kill (aka send signal to pthread handle) always kills process. This is what you expect bc signal handler inheritance. [*] Reformat the build Aurora.json file [+] Added clang warning ignores to the build file [*] Fix: UNIX need to use STDOUT_FILENO. Was using CRT handle in place of fd by mistake. [+] Linux implementation for IO yield (AuIO::IOYield() - UNIX::LinuxOverlappedYield()) [*] Fix: Linux async end of stream processing. res 0 = zero bytes consumed. <= was detecting this as an error of code 0. Should succeed with zero bytes. [+] Linux LoopQueue missing epilogue hook for the IO processor [*] Various refactors and minor bug fixes [*] Linux fix: Handle pipe EOS as zero [*] Linux fix: thread termination via a user signal of 77. Need a force terminate. [*] IPC handle: fix improper int to bool cast in the header setup within ToString [*] Linux fix: HWInfo CPU topology regression [-] Linux fix: remove SIGABRT handler [*] Missing override in compression, exit, and consoletty headers. [+] Unix Syslog logger backend
2022-08-02 04:52:17 +00:00
AuUInt GetWriteOffset() override;
AuUInt SetWriteOffset(AuUInt offset) override;
bool asyncActive {};
AuUInt readOffset {};
AuUInt writeOffset {};
bool isStream {};
bool flushOnWrite {true};
AuOptionalEx<EStreamError> errorCode;
int locked {};
AsyncStreamReader reader;
AsyncStreamWriter writer;
IOWatachableIOLoopSource source;
};
AsyncStreamMemory::AsyncStreamMemory(const AuMemoryViewWrite &ref) :
AuMemoryViewWrite(ref.ptr, ref.length)
{
}
AsyncStreamMemory::AsyncStreamMemory(const AuSPtr<AuMemoryViewWrite> &ref) :
AuMemoryViewWrite(ref->ptr, ref->length),
pin(ref)
{
}
AsyncStreamMemory::AsyncStreamMemory(AuUInt length) : AuMemoryViewWrite(AuMemory::ZAlloc<AuUInt8*>(length, AuHwInfo::GetPageSize()), length)
{
this->owned = true;
}
AsyncStreamMemory::~AsyncStreamMemory()
{
if (this->ptr && this->owned)
{
AuMemory::Free(this->ptr);
this->ptr = nullptr;
}
}
bool AsyncStreamMemory::IsValid()
{
return bool(this->ptr);
}
2022-06-29 13:56:59 +00:00
bool AsyncStreamAdapter::SetFlushOnWrite(bool value)
{
return AuExchange(this->flushOnWrite, value);
}
2022-06-29 13:56:59 +00:00
AuUInt AsyncStreamAdapter::GetReadOffset()
{
return this->readOffset;
}
2022-06-29 13:56:59 +00:00
AuUInt AsyncStreamAdapter::SetReadOffset(AuUInt offset)
{
if (this->locked == 1)
{
this->writer.Preframe();
}
return AuExchange(this->readOffset, offset);
}
2022-06-29 13:56:59 +00:00
AuUInt AsyncStreamAdapter::GetWriteOffset()
{
if (this->locked == 1)
{
this->writer.Preframe();
}
return this->writeOffset;
}
2022-06-29 13:56:59 +00:00
AuUInt AsyncStreamAdapter::SetWriteOffset(AuUInt offset)
{
return AuExchange(this->writeOffset, offset);
}
2022-06-29 13:56:59 +00:00
bool AsyncStreamAdapter::Init(const AuSPtr<IAsyncTransaction> &transaction, bool isStream)
{
this->transaction = transaction;
this->lastAllocation.reset();
this->asyncActive = false;
this->reader.parent = this;
this->writer.parent = this;
this->isStream = isStream;
return true;
}
2022-06-29 13:56:59 +00:00
AsyncStreamAdapter::AsyncStreamAdapter() : source({})
{
}
2022-06-29 13:56:59 +00:00
AuSPtr<AsyncStreamMemory> AsyncStreamAdapter::AllocateNextPageCached(AuUInt length)
{
if (this->lastAllocation)
{
if (this->lastAllocation->length >= length)
{
return this->lastAllocation;
}
}
auto newMem = AuMakeShared<AsyncStreamMemory>(length);
if (!newMem)
{
SysPushErrorMem();
return {};
}
if (!newMem->IsValid())
{
SysPushErrorMem();
return {};
}
return this->lastAllocation = newMem;
}
EStreamError AsyncStreamReader::IsOpen()
{
return this->parent->errorCode.HasValue() ?
this->parent->errorCode.value() :
EStreamError::eErrorNone;
}
EStreamError AsyncStreamReader::BeginRead(const AuSPtr<Memory::MemoryViewWrite> &internalView)
{
if (parent->lastAllocation)
{
auto length = parent->transaction->GetLastPacketLength();
if (length &&
parent->lastAllocation->streamIndex != length)
{
AuDebugBreak();
return EStreamError::eErrorStreamInterrupted;
}
parent->lastAllocation.reset();
}
if (parent->asyncActive && !parent->transaction->Complete())
{
AuDebugBreak();
return EStreamError::eErrorStreamInterrupted;
}
// Async success or blank state
parent->transaction->Reset();
parent->asyncActive = true;
parent->lastAllocation = AuMakeShared<AsyncStreamMemory>(internalView);
if (!parent->lastAllocation)
{
AuDebugBreak();
return EStreamError::eErrorStreamInterrupted;
}
parent->lastAllocation->streamIndex = 0;
if (!parent->transaction->StartRead(parent->isStream ? 0 : parent->readOffset, parent->lastAllocation))
{
parent->asyncActive = false;
SysPushErrorNested("Couldn't start async aio read");
return EStreamError::eErrorStreamInterrupted;
}
return EStreamError::eErrorNone;
}
EStreamError AsyncStreamReader::Dequeue(AuUInt reqLength, Memory::MemoryViewWrite &out)
{
out = {};
if (!parent->transaction->Complete())
{
return EStreamError::eErrorNone;
}
// Transaction error
if (parent->transaction->Failed())
{
parent->asyncActive = false;
parent->transaction->Reset();
SysPushErrorIO("IO: {}", parent->transaction->GetOSErrorCode());
return EStreamError::eErrorStreamInterrupted;
}
// Async error
if (parent->errorCode.HasValue())
{
auto code = parent->isStream ?
parent->errorCode.Value() :
AuExchange(parent->errorCode, {}).Value();
if (code != EStreamError::eErrorNone)
{
parent->asyncActive = false;
parent->transaction->Reset();
return code;
}
}
auto length = parent->transaction->GetLastPacketLength();
if (!length)
{
parent->asyncActive = false;
parent->transaction->Reset();
return EStreamError::eErrorNone;
}
auto streamIndex = parent->lastAllocation->streamIndex;
if (streamIndex == length)
{
return EStreamError::eErrorNone;
}
auto toRead = AuMin<AuUInt>(parent->lastAllocation->length, length - streamIndex);
auto bRequestedLen = bool(reqLength);
out.ptr = this->parent->lastAllocation->ToPointer() + streamIndex;
out.length = reqLength ? AuMin(toRead, reqLength) : toRead;
if (bRequestedLen)
{
streamIndex += out.length;
parent->lastAllocation->streamIndex = streamIndex;
if (!parent->isStream)
{
parent->readOffset += out.length;
}
if (parent->lastAllocation->streamIndex == length)
{
parent->asyncActive = false;
parent->transaction->Reset();
}
}
return EStreamError::eErrorNone;
}
EStreamError AsyncStreamReader::Read(const Memory::MemoryViewStreamWrite &parameters)
{
if (!parameters.length)
{
SysPushErrorArg();
return EStreamError::eErrorEndOfStream;
}
// Read from the last tranaction, if not fully consumed
if (parent->lastAllocation)
{
auto length = parent->transaction->GetLastPacketLength();
if (length &&
parent->lastAllocation->streamIndex != length)
{
auto toRead = AuMin<AuUInt>(parameters.length, length - parent->lastAllocation->streamIndex);
if (toRead)
{
if (parameters.ptr)
{
AuMemcpy(parameters.ptr, parent->lastAllocation->Begin<AuUInt8>() + parent->lastAllocation->streamIndex, toRead);
if (parent->isStream)
{
parent->lastAllocation->streamIndex += toRead;
}
else
{
parent->lastAllocation->streamIndex += length;
parent->readOffset += length;
}
}
}
if (parent->isStream)
{
parent->lastAllocation->streamIndex += length;
}
parameters.outVariable = toRead;
return EStreamError::eErrorNone;
}
if (parent->transaction && parent->transaction->Failed())
{
SysPushErrorIO("AIO transaction read failed: {}", parent->transaction->GetOSErrorCode());
parent->errorCode = EStreamError::eErrorStreamInterrupted;
parent->lastAllocation.reset();
}
}
// Async error
if (parent->errorCode.HasValue())
{
auto code = parent->isStream ?
parent->errorCode.Value() :
AuExchange(parent->errorCode, {}).Value();
if (code != EStreamError::eErrorNone)
{
return code;
}
}
// Async awaiting response
if (parent->asyncActive && !parent->transaction->Complete())
{
parameters.outVariable = 0;
return EStreamError::eErrorNone;
}
// Async success or blank state
parent->transaction->Reset();
parent->asyncActive = true;
parent->lastAllocation = parent->AllocateNextPageCached(parameters.length);
parent->lastAllocation->streamIndex = 0;
if (!parent->transaction->StartRead(parent->isStream ? 0 : parent->readOffset, parent->lastAllocation))
{
parent->asyncActive = false;
SysPushErrorNested("Couldn't start async aio read");
return EStreamError::eErrorStreamInterrupted;
}
return EStreamError::eErrorNone;
}
void AsyncStreamReader::Close()
{
}
AsyncStreamWriter::~AsyncStreamWriter()
{
if (used)
{
Flush();
}
}
EStreamError AsyncStreamWriter::IsOpen()
{
return this->parent->errorCode.HasValue() ?
this->parent->errorCode.value() :
EStreamError::eErrorNone;
}
EStreamError AsyncStreamWriter::Write(const Memory::MemoryViewStreamRead &parameters)
{
used = true;
if (!parameters.ptr)
{
return EStreamError::eErrorStreamInterrupted;
}
Preframe();
if (parent->errorCode.HasValue())
{
auto code = parent->isStream ?
parent->errorCode.Value() :
AuExchange(parent->errorCode, {}).Value();
if (code != EStreamError::eErrorNone)
{
return code;
}
}
auto newMem = AuMakeShared<AsyncStreamMemory>(parameters.length);
if (!newMem)
{
SysPushErrorMem();
return EStreamError::eErrorStreamInterrupted;
}
if (!newMem->IsValid())
{
SysPushErrorMem();
return EStreamError::eErrorStreamInterrupted;
}
AuMemcpy(newMem->ptr, parameters.ptr, parameters.length);
parameters.outVariable = parameters.length;
if (!AuTryInsert(this->writesPending, newMem))
{
SysPushErrorMem();
return EStreamError::eErrorStreamInterrupted;
}
if (this->parent->flushOnWrite)
{
Frame();
}
return EStreamError::eErrorNone;
}
void AsyncStreamWriter::Flush()
{
Preframe();
Frame();
}
void AsyncStreamWriter::Close()
{
Flush();
}
void AsyncStreamWriter::Preframe()
{
if (parent->transaction->Complete())
{
if (parent->transaction->Failed())
{
SysPushErrorIO("AIO transaction write failed: {}", parent->transaction->GetOSErrorCode());
parent->errorCode = EStreamError::eErrorStreamInterrupted;
parent->lastAllocation.reset();
}
else
{
parent->transaction->GetLastPacketLength();
}
parent->transaction->Reset();
}
}
void AsyncStreamWriter::Frame()
{
AuSPtr<AsyncStreamMemory> buffer;
if (this->writesPending.size() == 1)
{
buffer = AuMove(this->writesPending[0]);
}
else
{
AuUInt length {};
for (auto &a : this->writesPending)
{
length += a->length;
}
buffer = this->parent->AllocateNextPageCached(length);
if (!buffer)
{
return;
}
AuUInt index {};
for (auto &a : this->writesPending)
{
if (a->length + index > buffer->length)
{
SysPanic("");
}
AuMemcpy(buffer->Begin<AuUInt8>() + index, a->ptr, a->length);
index += a->length;
}
//....
}
// Async success or blank state
parent->transaction->Reset();
parent->asyncActive = true;
struct WriteMem : AuMemoryViewRead
{
AuSPtr<AsyncStreamMemory> write;
};
auto annoying = AuMakeShared<WriteMem>();
if (!annoying)
{
SysPushErrorMem();
return;
}
annoying->write = buffer;
annoying->ptr = buffer->ptr;
annoying->length = buffer->length;
parent->lastAllocation = buffer;
parent->lastAllocation->streamIndex = 0;
if (!parent->transaction->StartWrite(parent->isStream ? 0 : parent->writeOffset, annoying))
{
parent->asyncActive = false;
SysPushErrorNested("Couldn't start async aio write");
return;
}
this->writesPending.clear();
}
bool AsyncStreamWriter::HasWorkItems()
{
return this->writesPending.size();
}
2022-06-29 13:56:59 +00:00
AuSPtr<IAsyncStreamReader> AsyncStreamAdapter::ToStreamReader()
{
if (this->locked != 0 && this->locked != 2)
{
return {};
}
this->locked = 2;
return AuSPtr<IAsyncStreamReader>(AuSharedFromThis(), &this->reader);
}
2022-06-29 13:56:59 +00:00
AuSPtr<IStreamWriter> AsyncStreamAdapter::ToStreamWriter()
{
if (this->locked != 0 && this->locked != 1)
{
return {};
}
this->locked = 1;
return AuSPtr<IStreamWriter>(AuSharedFromThis(), &this->writer);
}
2022-06-29 13:56:59 +00:00
AuSPtr<IIOWaitableItem> AsyncStreamAdapter::ToWaitable()
{
[+] Network + Protocol + TLS - Initial Commit ============================================================================= Network ]==================================================================== ============================================================================= [+] Added (very) early Aurora::IO::Net implementation [+] AuNet::EHostnameType [+] AuNet::EIPProtocol [+] AuNet::ENetworkError [+] AuNet::ETransportProtocol [+] AuNet::INetInterface [+] AuNet::INetSrvDatagram [+] AuNet::INetSrvResolve [+] AuNet::INetSrvSockets [+] AuNet::INetSrvWorkers [+] AuNet::INetWorker [+] AuNet::IPAddress [+] AuNet::IResolver [+] AuNet::ISocket [+] AuNet::IResolver [+] AuNet::ISocketBase [+] AuNet::ISocketChannel [+] AuNet::ISocketDriver [+] AuNet::ISocketDriverFactory [+] AuNet::ISocketServer [+] AuNet::ISocketServerDriver [+] AuNet::NetEndpoint [+] AuNet::NetError [+] AuNet::NetHostname (+implementation) ============================================================================= Protocol ]=================================================================== ============================================================================= [+] IProtocolInterceptor [+] IProtocolInterceptorEx [+] IProtocolStack (+implementation) ============================================================================= TLS ]======================================================================== ============================================================================= [+] ITLSContext [+] TLSProtocolRecv [+] TLSProtocolSend (+implementation) ============================================================================= IO Bug Fixes ]=============================================================== ============================================================================= [*] IOProcessor::SubmitIOWorkItem should signal the CvEvent, forcing at least once future tick (wont optimize with if in tick & not yet dispatched work items) [*] Split IOPipeWork in into IOPipeProcessor header [+] IOPipeWork::GetBuffer (internal reallocation) [*] Harden against IAsyncTransactions without a loop source [*] Missing null `if (processor->listener)` in IOProcessor [*] Solved some soft-lock conditions under Linux's LoopQueue (added deferred commits) [*] Quick hack: IOProcessor::HasItems() should OR the early can-tick check function. ============================================================================= Other ]====================================================================== ============================================================================= [+] Linux: LSSignalCatcher [+] `static void AuResetMember(Aurora::Memory::ByteBuffer &ref)` for AuROXTL [*] Attempt to enforce a normalization and don't overwrite-readptr-under-istreamwriters policy in ByteBuffer_ReadWrite (circular buffers) [*] Bad ECC ctors ============================================================================= Known issues ]=============================================================== ============================================================================= > Linux net is nowhere near done > UDP socket emulation layer isn't implemented > Ciphersuite API is a stub > Private key API is a stub > ...therefore no TLS servers > Missing thread safety precautions under net > Net implementation is still beri early
2022-08-28 19:02:06 +00:00
auto pLoopSource = this->transaction->NewLoopSource();
if (!pLoopSource)
{
return {};
}
this->source.SetLoopSource(pLoopSource);
return AuSPtr<IIOWaitableItem>(AuSharedFromThis(), &this->source);
}
2022-06-29 13:56:59 +00:00
void AsyncStreamAdapter::ReserveBuffer(AuUInt length)
{
if (!this->lastAllocation || !this->asyncActive)
{
this->lastAllocation = this->AllocateNextPageCached(length);
}
}
2022-06-29 13:56:59 +00:00
bool AsyncStreamAdapter::Reset()
{
if (this->locked == 1)
{
if (this->writer.HasWorkItems())
{
return false;
}
this->writer.Flush();
}
if (this->asyncActive)
{
if (!this->transaction->Complete())
{
return false;
}
}
this->locked = 0;
this->transaction->Reset();
this->writeOffset = 0;
this->readOffset = 0;
return true;
}
2022-06-29 13:56:59 +00:00
AUKN_SYM AuSPtr<IAsyncStreamAdapter> NewAsyncStreamAdapter(const AuSPtr<IAsyncTransaction> &transaction, bool isStream)
{
if (!transaction)
{
SysPushErrorArg();
return {};
}
2022-06-29 13:56:59 +00:00
auto adapter = AuMakeShared<AsyncStreamAdapter>();
if (!adapter)
{
return {};
}
if (!adapter->Init(transaction, isStream))
{
return {};
}
return adapter;
}
}