/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: IOAdapterAsyncStream.cpp Date: 2022-6-6 Author: Reece ***/ #include #include #include "IOAdapterAsyncStream.hpp" #include "IOWaitableIOLoopSource.hpp" namespace Aurora::IO { struct AsyncStreamAdapater; struct AsyncStreamReader : IAsyncStreamReader { AsyncStreamAdapater *parent; EStreamError IsOpen() override; EStreamError Read(const Memory::MemoryViewStreamWrite ¶meters) override; void Close() override; EStreamError BeginRead(const AuSPtr &internalView) override; EStreamError Dequeue(AuUInt reqLength, Memory::MemoryViewWrite &out) override; }; struct AsyncStreamMemory : AuMemoryViewWrite { AsyncStreamMemory(const AuMemoryViewWrite &ref); AsyncStreamMemory(const AuSPtr &ref); AsyncStreamMemory(AuUInt length); ~AsyncStreamMemory(); bool IsValid(); AuSPtr pin; AuUInt streamIndex {}; // todo shared }; struct AsyncStreamWriter : IStreamWriter { ~AsyncStreamWriter(); AsyncStreamAdapater *parent; EStreamError IsOpen() override; EStreamError Write(const Memory::MemoryViewStreamRead ¶meters) override; void Close() override; void Flush() override; void Preframe(); void Frame(); AuList> writesPending; bool HasWorkItems(); bool used {}; }; struct AsyncStreamAdapater : IAsyncStreamAdapater, AuEnableSharedFromThis { AsyncStreamAdapater(); AuSPtr AllocateNextPageCached(AuUInt length); virtual AuSPtr ToStreamReader() override; virtual AuSPtr ToStreamWriter() override; virtual AuSPtr ToWaitable() override; virtual bool Reset() override; bool Init(const AuSPtr &transaction, bool isStream); AuSPtr lastAllocation; AuSPtr transaction; bool SetFlushOnWrite(bool value) override; void ReserveBuffer(AuUInt length) override; AuUInt GetReadOffset(); AuUInt SetReadOffset(AuUInt offset); AuUInt GetWriteOffset(); AuUInt SetWriteOffset(AuUInt offset); bool asyncActive {}; AuUInt readOffset {}; AuUInt writeOffset {}; bool isStream {}; bool flushOnWrite {true}; AuOptionalEx errorCode; int locked {}; AsyncStreamReader reader; AsyncStreamWriter writer; IOWatachableIOLoopSource source; }; AsyncStreamMemory::AsyncStreamMemory(const AuMemoryViewWrite &ref) : AuMemoryViewWrite(ref.ptr, ref.length) { } AsyncStreamMemory::AsyncStreamMemory(const AuSPtr &ref) : AuMemoryViewWrite(ref->ptr, ref->length), pin(ref) { } AsyncStreamMemory::AsyncStreamMemory(AuUInt length) : AuMemoryViewWrite(AuMemory::ZAlloc(length), length) { } AsyncStreamMemory::~AsyncStreamMemory() { if (this->ptr) { AuMemory::Free(this->ptr); this->ptr = nullptr; } } bool AsyncStreamMemory::IsValid() { return bool(this->ptr); } bool AsyncStreamAdapater::SetFlushOnWrite(bool value) { return AuExchange(this->flushOnWrite, value); } AuUInt AsyncStreamAdapater::GetReadOffset() { return this->readOffset; } AuUInt AsyncStreamAdapater::SetReadOffset(AuUInt offset) { if (this->locked == 1) { this->writer.Preframe(); } return AuExchange(this->readOffset, offset); } AuUInt AsyncStreamAdapater::GetWriteOffset() { if (this->locked == 1) { this->writer.Preframe(); } return this->writeOffset; } AuUInt AsyncStreamAdapater::SetWriteOffset(AuUInt offset) { return AuExchange(this->writeOffset, offset); } bool AsyncStreamAdapater::Init(const AuSPtr &transaction, bool isStream) { this->transaction = transaction; this->lastAllocation.reset(); this->asyncActive = false; this->reader.parent = this; this->writer.parent = this; return true; } AsyncStreamAdapater::AsyncStreamAdapater() : source({}) { } AuSPtr AsyncStreamAdapater::AllocateNextPageCached(AuUInt length) { if (this->lastAllocation) { if (this->lastAllocation->length >= length) { return this->lastAllocation; } } auto newMem = AuMakeShared(length); if (!newMem) { SysPushErrorMem(); return {}; } if (!newMem->IsValid()) { SysPushErrorMem(); return {}; } return this->lastAllocation = newMem; } EStreamError AsyncStreamReader::IsOpen() { return this->parent->errorCode.HasValue() ? this->parent->errorCode.value() : EStreamError::eErrorNone; } EStreamError AsyncStreamReader::BeginRead(const AuSPtr &internalView) { if (parent->lastAllocation) { auto length = parent->transaction->GetLastPacketLength(); if (length && parent->lastAllocation->streamIndex != length) { AuDebugBreak(); return EStreamError::eErrorStreamInterrupted; } parent->lastAllocation.reset(); } if (parent->asyncActive && !parent->transaction->Complete()) { AuDebugBreak(); return EStreamError::eErrorStreamInterrupted; } // Async success or blank state parent->transaction->Reset(); parent->asyncActive = true; parent->lastAllocation = AuMakeShared(internalView); if (!parent->lastAllocation) { AuDebugBreak(); return EStreamError::eErrorStreamInterrupted; } parent->lastAllocation->streamIndex = 0; if (!parent->transaction->StartRead(parent->isStream ? 0 : parent->readOffset, parent->lastAllocation)) { parent->asyncActive = false; SysPushErrorNested("Couldn't start async aio read"); return EStreamError::eErrorStreamInterrupted; } return EStreamError::eErrorNone; } EStreamError AsyncStreamReader::Dequeue(AuUInt reqLength, Memory::MemoryViewWrite &out) { out = {}; // Transaction error if (parent->transaction->Failed()) { parent->asyncActive = false; parent->transaction->Reset(); return EStreamError::eErrorStreamInterrupted; } // Async error if (parent->errorCode.HasValue()) { auto code = parent->isStream ? parent->errorCode.Value() : AuExchange(parent->errorCode, {}).Value(); if (code != EStreamError::eErrorNone) { parent->asyncActive = false; parent->transaction->Reset(); return code; } } auto length = parent->transaction->GetLastPacketLength(); if (!length) { parent->asyncActive = false; parent->transaction->Reset(); return EStreamError::eErrorNone; } auto streamIndex = parent->lastAllocation->streamIndex; if (streamIndex == length) { return EStreamError::eErrorNone; } auto toRead = AuMin(parent->lastAllocation->length, length - streamIndex); auto bRequestedLen = bool(reqLength); out.ptr = this->parent->lastAllocation->ToPointer() + streamIndex; out.length = reqLength ? AuMin(toRead, reqLength) : toRead; if (bRequestedLen) { streamIndex += out.length; parent->lastAllocation->streamIndex = streamIndex; if (!parent->isStream) { parent->readOffset += out.length; } if (parent->lastAllocation->streamIndex == length) { parent->asyncActive = false; parent->transaction->Reset(); } } return EStreamError::eErrorNone; } EStreamError AsyncStreamReader::Read(const Memory::MemoryViewStreamWrite ¶meters) { if (!parameters.length) { SysPushErrorArg(); return EStreamError::eErrorEndOfStream; } // Read from the last tranaction, if not fully consumed if (parent->lastAllocation) { auto length = parent->transaction->GetLastPacketLength(); if (length && parent->lastAllocation->streamIndex != length) { auto toRead = AuMin(parameters.length, length - parent->lastAllocation->streamIndex); if (toRead) { if (parameters.ptr) { AuMemcpy(parameters.ptr, parent->lastAllocation->Begin() + parent->lastAllocation->streamIndex, toRead); if (parent->isStream) { parent->lastAllocation->streamIndex += toRead; } else { parent->lastAllocation->streamIndex += length; parent->readOffset += length; } } } if (parent->isStream) { parent->lastAllocation->streamIndex += length; } parameters.outVariable = toRead; return EStreamError::eErrorNone; } if (parent->transaction && parent->transaction->Failed()) { SysPushErrorIO("AIO transaction read failed: {}", parent->transaction->GetOSErrorCode()); parent->errorCode = EStreamError::eErrorStreamInterrupted; parent->lastAllocation.reset(); } } // Async error if (parent->errorCode.HasValue()) { auto code = parent->isStream ? parent->errorCode.Value() : AuExchange(parent->errorCode, {}).Value(); if (code != EStreamError::eErrorNone) { return code; } } // Async awaiting response if (parent->asyncActive && !parent->transaction->Complete()) { parameters.outVariable = 0; return EStreamError::eErrorNone; } // Async success or blank state parent->transaction->Reset(); parent->asyncActive = true; parent->lastAllocation = parent->AllocateNextPageCached(parameters.length); parent->lastAllocation->streamIndex = 0; if (!parent->transaction->StartRead(parent->isStream ? 0 : parent->readOffset, parent->lastAllocation)) { parent->asyncActive = false; SysPushErrorNested("Couldn't start async aio read"); return EStreamError::eErrorStreamInterrupted; } return EStreamError::eErrorNone; } void AsyncStreamReader::Close() { } AsyncStreamWriter::~AsyncStreamWriter() { if (used) { Flush(); } } EStreamError AsyncStreamWriter::IsOpen() { return this->parent->errorCode.HasValue() ? this->parent->errorCode.value() : EStreamError::eErrorNone; } EStreamError AsyncStreamWriter::Write(const Memory::MemoryViewStreamRead ¶meters) { used = true; if (!parameters.ptr) { return EStreamError::eErrorStreamInterrupted; } Preframe(); if (parent->errorCode.HasValue()) { auto code = parent->isStream ? parent->errorCode.Value() : AuExchange(parent->errorCode, {}).Value(); if (code != EStreamError::eErrorNone) { return code; } } auto newMem = AuMakeShared(parameters.length); if (!newMem) { SysPushErrorMem(); return EStreamError::eErrorStreamInterrupted; } if (!newMem->IsValid()) { SysPushErrorMem(); return EStreamError::eErrorStreamInterrupted; } AuMemcpy(newMem->ptr, parameters.ptr, parameters.length); parameters.outVariable = parameters.length; if (!AuTryInsert(this->writesPending, newMem)) { SysPushErrorMem(); return EStreamError::eErrorStreamInterrupted; } if (this->parent->flushOnWrite) { Frame(); } return EStreamError::eErrorNone; } void AsyncStreamWriter::Flush() { Preframe(); Frame(); } void AsyncStreamWriter::Close() { Flush(); } void AsyncStreamWriter::Preframe() { if (parent->transaction->Complete()) { if (parent->transaction->Failed()) { SysPushErrorIO("AIO transaction write failed: {}", parent->transaction->GetOSErrorCode()); parent->errorCode = EStreamError::eErrorStreamInterrupted; parent->lastAllocation.reset(); } else { parent->transaction->GetLastPacketLength(); } parent->transaction->Reset(); } } void AsyncStreamWriter::Frame() { AuSPtr buffer; if (this->writesPending.size() == 1) { buffer = AuMove(this->writesPending[0]); } else { AuUInt length {}; for (auto &a : this->writesPending) { length += a->length; } buffer = this->parent->AllocateNextPageCached(length); if (!buffer) { return; } AuUInt index {}; for (auto &a : this->writesPending) { if (a->length + index > buffer->length) { SysPanic(""); } AuMemcpy(buffer->Begin() + index, a->ptr, a->length); index += a->length; } //.... } // Async success or blank state parent->transaction->Reset(); parent->asyncActive = true; struct WriteMem : AuMemoryViewRead { AuSPtr write; }; auto annoying = AuMakeShared(); if (!annoying) { SysPushErrorMem(); return; } annoying->write = buffer; annoying->ptr = buffer->ptr; annoying->length = buffer->length; parent->lastAllocation = buffer; parent->lastAllocation->streamIndex = 0; if (!parent->transaction->StartWrite(parent->isStream ? 0 : parent->writeOffset, annoying)) { parent->asyncActive = false; SysPushErrorNested("Couldn't start async aio write"); return; } this->writesPending.clear(); } bool AsyncStreamWriter::HasWorkItems() { return this->writesPending.size(); } AuSPtr AsyncStreamAdapater::ToStreamReader() { if (this->locked != 0 && this->locked != 2) { return {}; } this->locked = 2; return AuSPtr(AuSharedFromThis(), &this->reader); } AuSPtr AsyncStreamAdapater::ToStreamWriter() { if (this->locked != 0 && this->locked != 1) { return {}; } this->locked = 1; return AuSPtr(AuSharedFromThis(), &this->writer); } AuSPtr AsyncStreamAdapater::ToWaitable() { this->source.SetLoopSource(this->transaction->NewLoopSource()); return AuSPtr(AuSharedFromThis(), &this->source); } void AsyncStreamAdapater::ReserveBuffer(AuUInt length) { if (!this->lastAllocation || !this->asyncActive) { this->lastAllocation = this->AllocateNextPageCached(length); } } bool AsyncStreamAdapater::Reset() { if (this->locked == 1) { if (this->writer.HasWorkItems()) { return false; } this->writer.Flush(); } if (this->asyncActive) { if (!this->transaction->Complete()) { return false; } } this->locked = 0; this->transaction->Reset(); this->writeOffset = 0; this->readOffset = 0; return true; } AUKN_SYM AuSPtr NewAsyncStreamAdapter(const AuSPtr &transaction, bool isStream) { if (!transaction) { SysPushErrorArg(); return {}; } auto adapter = AuMakeShared(); if (!adapter) { return {}; } if (!adapter->Init(transaction, isStream)) { return {}; } return adapter; } }