690 lines
18 KiB
C++
690 lines
18 KiB
C++
/***
|
|
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
File: AuIOAdapterAsyncStream.cpp
|
|
Date: 2022-6-6
|
|
Author: Reece
|
|
***/
|
|
#include <Source/RuntimeInternal.hpp>
|
|
#include <Aurora/IO/IOExperimental.hpp>
|
|
#include "AuIOAdapterAsyncStream.hpp"
|
|
#include "../AuIOWaitableIOLoopSource.hpp"
|
|
|
|
namespace Aurora::IO::Adapters
|
|
{
|
|
struct AsyncStreamAdapter;
|
|
|
|
struct AsyncStreamReader : IAsyncStreamReader
|
|
{
|
|
AsyncStreamAdapter *parent;
|
|
|
|
EStreamError IsOpen() override;
|
|
EStreamError Read(const Memory::MemoryViewStreamWrite ¶meters) override;
|
|
void Close() override;
|
|
|
|
EStreamError BeginRead(const AuSPtr<Memory::MemoryViewWrite> &internalView) override;
|
|
EStreamError Dequeue(AuUInt uReqLength, Memory::MemoryViewWrite &out) override;
|
|
};
|
|
|
|
struct AsyncStreamMemory : AuMemoryViewWrite
|
|
{
|
|
AsyncStreamMemory(const AuMemoryViewWrite &ref);
|
|
AsyncStreamMemory(const AuSPtr<AuMemoryViewWrite> &ref);
|
|
AsyncStreamMemory(AuUInt uLength);
|
|
~AsyncStreamMemory();
|
|
|
|
bool IsValid();
|
|
bool owned {};
|
|
AuSPtr<AuMemoryViewWrite> pin;
|
|
AuUInt streamIndex {};
|
|
// todo shared
|
|
};
|
|
|
|
struct AsyncStreamWriter : IStreamWriter
|
|
{
|
|
~AsyncStreamWriter();
|
|
AsyncStreamAdapter *parent;
|
|
|
|
EStreamError IsOpen() override;
|
|
EStreamError Write(const Memory::MemoryViewStreamRead ¶meters) override;
|
|
void Close() override;
|
|
void Flush() override;
|
|
|
|
void Preframe();
|
|
void Frame();
|
|
|
|
AuList<AuSPtr<AsyncStreamMemory>> writesPending;
|
|
bool HasWorkItems();
|
|
bool used {};
|
|
};
|
|
|
|
struct AsyncStreamAdapter : IAsyncStreamAdapter, AuEnableSharedFromThis<AsyncStreamAdapter>
|
|
{
|
|
AsyncStreamAdapter();
|
|
|
|
AuSPtr<AsyncStreamMemory> AllocateNextPageCached(AuUInt64 uLength);
|
|
|
|
virtual AuSPtr<IAsyncStreamReader> ToStreamReader() override;
|
|
virtual AuSPtr<IStreamWriter> ToStreamWriter() override;
|
|
|
|
virtual AuSPtr<IIOWaitableItem> ToWaitable() override;
|
|
|
|
virtual bool Reset() override;
|
|
|
|
bool Init(const AuSPtr<IAsyncTransaction> &transaction, bool bIsStream);
|
|
|
|
AuSPtr<AsyncStreamMemory> lastAllocation;
|
|
AuSPtr<IAsyncTransaction> transaction;
|
|
|
|
bool SetFlushOnWrite(bool value) override;
|
|
|
|
void ReserveBuffer(AuUInt64 uLength) override;
|
|
|
|
AuUInt64 GetReadOffset() override;
|
|
AuUInt64 SetReadOffset(AuUInt64 offset) override;
|
|
|
|
AuUInt64 GetWriteOffset() override;
|
|
AuUInt64 SetWriteOffset(AuUInt64 offset) override;
|
|
|
|
bool bAsyncActive {};
|
|
bool bIsStream {};
|
|
bool bFlushOnWrite { true };
|
|
|
|
AuUInt64 readOffset {};
|
|
AuUInt64 writeOffset {};
|
|
|
|
AuOptionalEx<EStreamError> errorCode;
|
|
int locked {};
|
|
|
|
IOWatachableIOLoopSource source;
|
|
|
|
// impl
|
|
AsyncStreamReader reader;
|
|
AsyncStreamWriter writer;
|
|
};
|
|
|
|
|
|
AsyncStreamMemory::AsyncStreamMemory(const AuMemoryViewWrite &ref) :
|
|
AuMemoryViewWrite(ref.ptr, ref.length)
|
|
{
|
|
|
|
}
|
|
|
|
AsyncStreamMemory::AsyncStreamMemory(const AuSPtr<AuMemoryViewWrite> &ref) :
|
|
AuMemoryViewWrite(ref->ptr, ref->length),
|
|
pin(ref)
|
|
{
|
|
|
|
}
|
|
|
|
AsyncStreamMemory::AsyncStreamMemory(AuUInt uLength) : AuMemoryViewWrite(AuMemory::ZAlloc<AuUInt8*>(uLength, AuHwInfo::GetPageSize()), uLength)
|
|
{
|
|
this->owned = true;
|
|
}
|
|
|
|
AsyncStreamMemory::~AsyncStreamMemory()
|
|
{
|
|
if (this->ptr && this->owned)
|
|
{
|
|
AuMemory::Free(this->ptr);
|
|
this->ptr = nullptr;
|
|
}
|
|
}
|
|
|
|
bool AsyncStreamMemory::IsValid()
|
|
{
|
|
return bool(this->ptr);
|
|
}
|
|
|
|
bool AsyncStreamAdapter::SetFlushOnWrite(bool value)
|
|
{
|
|
return AuExchange(this->bFlushOnWrite, value);
|
|
}
|
|
|
|
AuUInt64 AsyncStreamAdapter::GetReadOffset()
|
|
{
|
|
return this->readOffset;
|
|
}
|
|
|
|
AuUInt64 AsyncStreamAdapter::SetReadOffset(AuUInt64 offset)
|
|
{
|
|
if (this->locked == 1)
|
|
{
|
|
this->writer.Preframe();
|
|
}
|
|
return AuExchange(this->readOffset, offset);
|
|
}
|
|
|
|
AuUInt64 AsyncStreamAdapter::GetWriteOffset()
|
|
{
|
|
if (this->locked == 1)
|
|
{
|
|
this->writer.Preframe();
|
|
}
|
|
return this->writeOffset;
|
|
}
|
|
|
|
AuUInt64 AsyncStreamAdapter::SetWriteOffset(AuUInt64 offset)
|
|
{
|
|
return AuExchange(this->writeOffset, offset);
|
|
}
|
|
|
|
bool AsyncStreamAdapter::Init(const AuSPtr<IAsyncTransaction> &transaction, bool bIsStream)
|
|
{
|
|
this->transaction = transaction;
|
|
this->lastAllocation.reset();
|
|
this->bAsyncActive = false;
|
|
this->reader.parent = this;
|
|
this->writer.parent = this;
|
|
this->bIsStream = bIsStream;
|
|
return true;
|
|
}
|
|
|
|
AsyncStreamAdapter::AsyncStreamAdapter() : source({})
|
|
{
|
|
|
|
}
|
|
|
|
AuSPtr<AsyncStreamMemory> AsyncStreamAdapter::AllocateNextPageCached(AuUInt64 uLength)
|
|
{
|
|
if (this->lastAllocation)
|
|
{
|
|
if (this->lastAllocation->length >= uLength)
|
|
{
|
|
return this->lastAllocation;
|
|
}
|
|
}
|
|
|
|
auto newMem = AuMakeShared<AsyncStreamMemory>(uLength);
|
|
SysCheckNotNull(newMem, {});
|
|
|
|
if (!newMem->IsValid())
|
|
{
|
|
SysPushErrorMem();
|
|
return {};
|
|
}
|
|
|
|
return this->lastAllocation = newMem;
|
|
}
|
|
|
|
EStreamError AsyncStreamReader::IsOpen()
|
|
{
|
|
return this->parent->errorCode.HasValue() ?
|
|
this->parent->errorCode.value() :
|
|
EStreamError::eErrorNone;
|
|
}
|
|
|
|
EStreamError AsyncStreamReader::BeginRead(const AuSPtr<Memory::MemoryViewWrite> &internalView)
|
|
{
|
|
if (parent->lastAllocation)
|
|
{
|
|
auto uLength = parent->transaction->GetLastPacketLength();
|
|
if (uLength &&
|
|
parent->lastAllocation->streamIndex != uLength)
|
|
{
|
|
AuDebugBreak();
|
|
return EStreamError::eErrorStreamInterrupted;
|
|
}
|
|
|
|
parent->lastAllocation.reset();
|
|
}
|
|
|
|
if (parent->bAsyncActive && !parent->transaction->Complete())
|
|
{
|
|
AuDebugBreak();
|
|
return EStreamError::eErrorStreamInterrupted;
|
|
}
|
|
|
|
// Async success or blank state
|
|
parent->transaction->Reset();
|
|
parent->bAsyncActive = true;
|
|
|
|
parent->lastAllocation = AuMakeShared<AsyncStreamMemory>(internalView);
|
|
if (!parent->lastAllocation)
|
|
{
|
|
AuDebugBreak();
|
|
return EStreamError::eErrorStreamInterrupted;
|
|
}
|
|
|
|
parent->lastAllocation->streamIndex = 0;
|
|
|
|
if (!parent->transaction->StartRead(parent->bIsStream ? 0 : parent->readOffset, parent->lastAllocation))
|
|
{
|
|
parent->bAsyncActive = false;
|
|
SysPushErrorNested("Couldn't start async aio read");
|
|
return EStreamError::eErrorStreamInterrupted;
|
|
}
|
|
|
|
return EStreamError::eErrorNone;
|
|
}
|
|
|
|
EStreamError AsyncStreamReader::Dequeue(AuUInt uReqLength, Memory::MemoryViewWrite &out)
|
|
{
|
|
out = {};
|
|
|
|
if (!parent->transaction->Complete())
|
|
{
|
|
return EStreamError::eErrorNone;
|
|
}
|
|
|
|
// Transaction error
|
|
if (parent->transaction->HasFailed())
|
|
{
|
|
parent->bAsyncActive = false;
|
|
parent->transaction->Reset();
|
|
SysPushErrorIO("IO: {}", parent->transaction->GetOSErrorCode());
|
|
return EStreamError::eErrorStreamInterrupted;
|
|
}
|
|
|
|
// Async error
|
|
if (parent->errorCode.HasValue())
|
|
{
|
|
auto code = parent->bIsStream ?
|
|
parent->errorCode.Value() :
|
|
AuExchange(parent->errorCode, {}).Value();
|
|
|
|
if (code != EStreamError::eErrorNone)
|
|
{
|
|
parent->bAsyncActive = false;
|
|
parent->transaction->Reset();
|
|
return code;
|
|
}
|
|
}
|
|
|
|
auto uLength = parent->transaction->GetLastPacketLength();
|
|
if (!uLength)
|
|
{
|
|
parent->bAsyncActive = false;
|
|
parent->transaction->Reset();
|
|
return EStreamError::eErrorNone;
|
|
}
|
|
|
|
auto streamIndex = parent->lastAllocation->streamIndex;
|
|
if (streamIndex == uLength)
|
|
{
|
|
return EStreamError::eErrorNone;
|
|
}
|
|
|
|
auto toRead = AuMin<AuUInt>(parent->lastAllocation->length, uLength - streamIndex);
|
|
auto bRequestedLen = bool(uReqLength);
|
|
|
|
out.ptr = this->parent->lastAllocation->ToPointer() + streamIndex;
|
|
out.length = uReqLength ? AuMin(toRead, uReqLength) : toRead;
|
|
|
|
if (bRequestedLen)
|
|
{
|
|
streamIndex += out.length;
|
|
parent->lastAllocation->streamIndex = streamIndex;
|
|
|
|
if (!parent->bIsStream)
|
|
{
|
|
parent->readOffset += out.length;
|
|
}
|
|
|
|
if (parent->lastAllocation->streamIndex == uLength)
|
|
{
|
|
parent->bAsyncActive = false;
|
|
parent->transaction->Reset();
|
|
}
|
|
}
|
|
|
|
return EStreamError::eErrorNone;
|
|
}
|
|
|
|
EStreamError AsyncStreamReader::Read(const Memory::MemoryViewStreamWrite ¶meters)
|
|
{
|
|
if (!parameters.length)
|
|
{
|
|
SysPushErrorArg();
|
|
return EStreamError::eErrorEndOfStream;
|
|
}
|
|
|
|
// Read from the last tranaction, if not fully consumed
|
|
if (parent->lastAllocation)
|
|
{
|
|
auto uLength = parent->transaction->GetLastPacketLength();
|
|
if (uLength &&
|
|
parent->lastAllocation->streamIndex != uLength)
|
|
{
|
|
auto toRead = AuMin<AuUInt>(parameters.length, uLength - parent->lastAllocation->streamIndex);
|
|
if (toRead)
|
|
{
|
|
if (parameters.ptr)
|
|
{
|
|
AuMemcpy(parameters.ptr, parent->lastAllocation->Begin<AuUInt8>() + parent->lastAllocation->streamIndex, toRead);
|
|
|
|
if (parent->bIsStream)
|
|
{
|
|
parent->lastAllocation->streamIndex += toRead;
|
|
}
|
|
else
|
|
{
|
|
parent->lastAllocation->streamIndex += uLength;
|
|
parent->readOffset += uLength;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (parent->bIsStream)
|
|
{
|
|
parent->lastAllocation->streamIndex += uLength;
|
|
}
|
|
|
|
parameters.outVariable = toRead;
|
|
return EStreamError::eErrorNone;
|
|
}
|
|
|
|
if (parent->transaction && parent->transaction->HasFailed())
|
|
{
|
|
SysPushErrorIO("AIO transaction read failed: {}", parent->transaction->GetOSErrorCode());
|
|
parent->errorCode = EStreamError::eErrorStreamInterrupted;
|
|
parent->lastAllocation.reset();
|
|
}
|
|
}
|
|
|
|
// Async error
|
|
if (parent->errorCode.HasValue())
|
|
{
|
|
auto code = parent->bIsStream ?
|
|
parent->errorCode.Value() :
|
|
AuExchange(parent->errorCode, {}).Value();
|
|
|
|
if (code != EStreamError::eErrorNone)
|
|
{
|
|
return code;
|
|
}
|
|
}
|
|
|
|
// Async awaiting response
|
|
if (parent->bAsyncActive && !parent->transaction->Complete())
|
|
{
|
|
parameters.outVariable = 0;
|
|
return EStreamError::eErrorNone;
|
|
}
|
|
|
|
// Async success or blank state
|
|
parent->transaction->Reset();
|
|
parent->bAsyncActive = true;
|
|
|
|
parent->lastAllocation = parent->AllocateNextPageCached(parameters.length);
|
|
parent->lastAllocation->streamIndex = 0;
|
|
|
|
if (!parent->transaction->StartRead(parent->bIsStream ? 0 : parent->readOffset, parent->lastAllocation))
|
|
{
|
|
parent->bAsyncActive = false;
|
|
SysPushErrorNested("Couldn't start async aio read");
|
|
return EStreamError::eErrorStreamInterrupted;
|
|
}
|
|
|
|
return EStreamError::eErrorNone;
|
|
}
|
|
|
|
void AsyncStreamReader::Close()
|
|
{
|
|
|
|
}
|
|
|
|
AsyncStreamWriter::~AsyncStreamWriter()
|
|
{
|
|
if (used)
|
|
{
|
|
Flush();
|
|
}
|
|
}
|
|
|
|
EStreamError AsyncStreamWriter::IsOpen()
|
|
{
|
|
return this->parent->errorCode.HasValue() ?
|
|
this->parent->errorCode.value() :
|
|
EStreamError::eErrorNone;
|
|
}
|
|
|
|
EStreamError AsyncStreamWriter::Write(const Memory::MemoryViewStreamRead ¶meters)
|
|
{
|
|
used = true;
|
|
|
|
if (!parameters.ptr)
|
|
{
|
|
return EStreamError::eErrorStreamInterrupted;
|
|
}
|
|
|
|
Preframe();
|
|
|
|
if (parent->errorCode.HasValue())
|
|
{
|
|
auto code = parent->bIsStream ?
|
|
parent->errorCode.Value() :
|
|
AuExchange(parent->errorCode, {}).Value();
|
|
|
|
if (code != EStreamError::eErrorNone)
|
|
{
|
|
return code;
|
|
}
|
|
}
|
|
|
|
auto newMem = AuMakeShared<AsyncStreamMemory>(parameters.length);
|
|
SysCheckNotNull(newMem, EStreamError::eErrorOutOfMemory);
|
|
|
|
if (!newMem->IsValid())
|
|
{
|
|
SysPushErrorMem();
|
|
return EStreamError::eErrorStreamInterrupted;
|
|
}
|
|
|
|
AuMemcpy(newMem->ptr, parameters.ptr, parameters.length);
|
|
parameters.outVariable = parameters.length;
|
|
|
|
if (!AuTryInsert(this->writesPending, newMem))
|
|
{
|
|
SysPushErrorMem();
|
|
return EStreamError::eErrorStreamInterrupted;
|
|
}
|
|
|
|
if (this->parent->bFlushOnWrite)
|
|
{
|
|
Frame();
|
|
}
|
|
|
|
return EStreamError::eErrorNone;
|
|
}
|
|
|
|
void AsyncStreamWriter::Flush()
|
|
{
|
|
Preframe();
|
|
Frame();
|
|
}
|
|
|
|
void AsyncStreamWriter::Close()
|
|
{
|
|
Flush();
|
|
}
|
|
|
|
void AsyncStreamWriter::Preframe()
|
|
{
|
|
if (parent->transaction->Complete())
|
|
{
|
|
if (parent->transaction->HasFailed())
|
|
{
|
|
SysPushErrorIO("AIO transaction write failed: {}", parent->transaction->GetOSErrorCode());
|
|
parent->errorCode = EStreamError::eErrorStreamInterrupted;
|
|
parent->lastAllocation.reset();
|
|
}
|
|
else
|
|
{
|
|
parent->transaction->GetLastPacketLength();
|
|
}
|
|
|
|
parent->transaction->Reset();
|
|
}
|
|
}
|
|
|
|
void AsyncStreamWriter::Frame()
|
|
{
|
|
AuSPtr<AsyncStreamMemory> buffer;
|
|
|
|
if (this->writesPending.size() == 1)
|
|
{
|
|
buffer = AuMove(this->writesPending[0]);
|
|
}
|
|
else
|
|
{
|
|
AuUInt uLength {};
|
|
|
|
for (auto &a : this->writesPending)
|
|
{
|
|
uLength += a->length;
|
|
}
|
|
|
|
buffer = this->parent->AllocateNextPageCached(uLength);
|
|
if (!buffer)
|
|
{
|
|
return;
|
|
}
|
|
|
|
AuUInt index {};
|
|
|
|
for (auto &a : this->writesPending)
|
|
{
|
|
if (a->length + index > buffer->length)
|
|
{
|
|
SysPanic("");
|
|
}
|
|
|
|
AuMemcpy(buffer->Begin<AuUInt8>() + index, a->ptr, a->length);
|
|
index += a->length;
|
|
}
|
|
|
|
//....
|
|
}
|
|
|
|
// Async success or blank state
|
|
parent->transaction->Reset();
|
|
parent->bAsyncActive = true;
|
|
|
|
struct WriteMem : AuMemoryViewRead
|
|
{
|
|
AuSPtr<AsyncStreamMemory> write;
|
|
};
|
|
|
|
auto annoying = AuMakeShared<WriteMem>();
|
|
if (!annoying)
|
|
{
|
|
SysPushErrorMem();
|
|
return;
|
|
}
|
|
|
|
annoying->write = buffer;
|
|
annoying->ptr = buffer->ptr;
|
|
annoying->length = buffer->length;
|
|
|
|
parent->lastAllocation = buffer;
|
|
parent->lastAllocation->streamIndex = 0;
|
|
|
|
if (!parent->transaction->StartWrite(parent->bIsStream ? 0 : parent->writeOffset, annoying))
|
|
{
|
|
parent->bAsyncActive = false;
|
|
SysPushErrorNested("Couldn't start async aio write");
|
|
return;
|
|
}
|
|
|
|
this->writesPending.clear();
|
|
}
|
|
|
|
bool AsyncStreamWriter::HasWorkItems()
|
|
{
|
|
return this->writesPending.size();
|
|
}
|
|
|
|
AuSPtr<IAsyncStreamReader> AsyncStreamAdapter::ToStreamReader()
|
|
{
|
|
if (this->locked != 0 && this->locked != 2)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
this->locked = 2;
|
|
|
|
return AuSPtr<IAsyncStreamReader>(AuSharedFromThis(), &this->reader);
|
|
}
|
|
|
|
AuSPtr<IStreamWriter> AsyncStreamAdapter::ToStreamWriter()
|
|
{
|
|
if (this->locked != 0 && this->locked != 1)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
this->locked = 1;
|
|
|
|
return AuSPtr<IStreamWriter>(AuSharedFromThis(), &this->writer);
|
|
}
|
|
|
|
AuSPtr<IIOWaitableItem> AsyncStreamAdapter::ToWaitable()
|
|
{
|
|
auto pLoopSource = this->transaction->NewLoopSource();
|
|
if (!pLoopSource)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
this->source.SetLoopSource(pLoopSource);
|
|
return AuSPtr<IIOWaitableItem>(AuSharedFromThis(), &this->source);
|
|
}
|
|
|
|
void AsyncStreamAdapter::ReserveBuffer(AuUInt64 uLength)
|
|
{
|
|
if (!this->lastAllocation || !this->bAsyncActive)
|
|
{
|
|
this->lastAllocation = this->AllocateNextPageCached(uLength);
|
|
}
|
|
}
|
|
|
|
bool AsyncStreamAdapter::Reset()
|
|
{
|
|
if (this->locked == 1)
|
|
{
|
|
if (this->writer.HasWorkItems())
|
|
{
|
|
return false;
|
|
}
|
|
|
|
this->writer.Flush();
|
|
}
|
|
|
|
if (this->bAsyncActive)
|
|
{
|
|
if (!this->transaction->Complete())
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
|
|
this->locked = 0;
|
|
this->transaction->Reset();
|
|
this->writeOffset = 0;
|
|
this->readOffset = 0;
|
|
return true;
|
|
}
|
|
|
|
AUKN_SYM AuSPtr<IAsyncStreamAdapter> NewAsyncStreamAdapter(const AuSPtr<IAsyncTransaction> &pTransaction, bool bIsStream)
|
|
{
|
|
if (!pTransaction)
|
|
{
|
|
SysPushErrorArg();
|
|
return {};
|
|
}
|
|
|
|
auto pAdapter = AuMakeShared<AsyncStreamAdapter>();
|
|
if (!pAdapter)
|
|
{
|
|
return {};
|
|
}
|
|
|
|
if (!pAdapter->Init(pTransaction, bIsStream))
|
|
{
|
|
return {};
|
|
}
|
|
|
|
return pAdapter;
|
|
}
|
|
} |