AuroraRuntime/Source/IO/Adapters/AuIOAdapterAsyncDelegators.cpp

270 lines
8.4 KiB
C++

/***
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuIOAdapterAsyncDelegators.cpp
Date: 2024-2-24
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuIOAdapterAsyncDelegators.hpp"
#include <Source/IO/Async/AuIOThreadPool.hpp>
#include <Source/IO/Async/AuIOAPCLessWaitable.hpp>
namespace Aurora::IO::Adapters
{
void AsyncReaderWriter::DispatchFrame(ProcessInfo &info)
{
try
{
if (this->iBaseOffset + AuInt64(this->uLastOffset) < 0)
{
this->eStreamError = EStreamError::eErrorOutOfBounds;
}
else if (this->pReadView)
{
if (this->pStreamWriter)
{
this->eStreamError = this->pStreamWriter->Write(AuMemoryViewStreamRead(this->pReadView, this->uLastLength));
}
else if (this->pStreamWriterEx)
{
this->eStreamError = this->pStreamWriterEx->ArbitraryWrite(this->iBaseOffset + this->uLastOffset, AuMemoryViewStreamRead(this->pReadView, this->uLastLength));
}
}
else if (this->pWriteView)
{
if (this->pStreamReader)
{
this->eStreamError = this->pStreamReader->Read(AuMemoryViewStreamWrite(this->pWriteView, this->uLastLength));
}
else if (this->pStreamReaderEx)
{
this->eStreamError = this->pStreamReaderEx->ArbitraryRead(this->iBaseOffset + this->uLastOffset, AuMemoryViewStreamWrite(this->pWriteView, this->uLastLength));
}
}
}
catch (...)
{
this->eStreamError = EStreamError::eErrorGenericFault;
}
this->bInProgress = false;
this->SignalComplete();
}
void AsyncReaderWriter::OnFailure()
{
this->bInProgress = false;
this->Reset();
}
bool AsyncReaderWriter::StartRead(AuUInt64 uOffset, const Memory::MemoryViewWrite &memoryView)
{
if (!memoryView)
{
SysPushErrorArg();
return false;
}
if (this->bInProgress)
{
return false;
}
if (!this->pStreamReaderEx && !this->pStreamReader)
{
return false;
}
AuResetMember(this->pReadView);
AuResetMember(this->pWriteView);
this->bInProgress = true;
this->uLastOffset = uOffset;
this->pWriteView = memoryView;
auto pThat = AuAsync::NewWorkItem(this->workers, AuStaticCast<AsyncReaderWriter>(this->SharedFromThis()));
if (!pThat)
{
return false;
}
return pThat->Dispatch();
}
bool AsyncReaderWriter::StartWrite(AuUInt64 uOffset, const Memory::MemoryViewRead &memoryView)
{
if (!memoryView)
{
SysPushErrorArg();
return false;
}
if (this->bInProgress)
{
return false;
}
if (!this->pStreamWriterEx && !this->pStreamWriter)
{
return false;
}
AuResetMember(this->pReadView);
AuResetMember(this->pWriteView);
this->bInProgress = true;
this->uLastOffset = uOffset;
this->pReadView = memoryView;
auto pThat = AuAsync::NewWorkItem(this->workers, AuStaticCast<AsyncReaderWriter>(this->SharedFromThis()));
if (!pThat)
{
return false;
}
return pThat->Dispatch();
}
AuUInt32 AsyncReaderWriter::GetLastPacketLength()
{
return this->uLastLength;
}
void AsyncReaderWriter::OnOriginThreadComplete()
{
if (this->pSubscriber)
{
this->pSubscriber->OnAsyncFileOpFinished(this->uLastOffset, this->GetLastPacketLength());
}
AuResetMember(this->pReadView);
AuResetMember(this->pWriteView);
}
bool AsyncReaderWriter::Complete()
{
Async::APCLessWaitable::CheckLocal();
return Async::APCLessWaitable::HasBeenSignaled();
}
bool AsyncReaderWriter::HasFailed()
{
return Async::APCLessWaitable::HasBeenSignaled() && this->eStreamError != EStreamError::eErrorNone;
}
bool AsyncReaderWriter::HasCompleted()
{
return Async::APCLessWaitable::HasBeenSignaled() && this->eStreamError == EStreamError::eErrorNone;
}
AuUInt AsyncReaderWriter::GetOSErrorCode()
{
return this->HasFailed() ? AuUInt(this->eStreamError) : 0;
}
void AsyncReaderWriter::SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &pSubscriber)
{
this->pSubscriber = pSubscriber;
}
bool AsyncReaderWriter::Wait(AuUInt32 uTimeout)
{
return NewLoopSource()->WaitOn(uTimeout);
}
AuSPtr<IO::Loop::ILoopSource> AsyncReaderWriter::NewLoopSource()
{
return Async::APCLessWaitable::GetLoopSource();
}
void AsyncReaderWriter::Reset()
{
if (this->bInProgress)
{
return;
}
AuResetMember(this->pReadView);
AuResetMember(this->pWriteView);
Async::APCLessWaitable::Reset();
}
void AsyncReaderWriter::SetBaseOffset(AuUInt64 uBaseOffset)
{
this->iBaseOffset = uBaseOffset;
}
bool AsyncReaderWriter::TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup)
{
return Async::APCLessWaitable::TryAttachToCompletionGroup(pCompletionGroup);
}
CompletionGroup::ICompletionGroupWorkHandle *AsyncReaderWriter::ToCompletionGroupHandle()
{
return this;
}
AuSPtr<CompletionGroup::ICompletionGroup> AsyncReaderWriter::GetCompletionGroup()
{
return Async::APCLessWaitable::GetCompletionGroup();
}
static AuWorkerPId GetAuxWorkerPool()
{
return Async::GetAuxWorkerPoolAndRegister();
}
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamReader(const AuSPtr<IStreamReader> &pStreamReader, AuOptional<AuWorkerPId> workers)
{
SysCheckArgNotNull(pStreamReader, {});
auto pObject = AuMakeShared<AsyncReaderWriter>();
SysCheckNotNullMemory(pObject, {});
pObject->pStreamReader = pStreamReader;
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
return pObject;
}
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamSeekingReader(const AuSPtr<ISeekingReader> &pStreamReader, AuOptional<AuWorkerPId> workers)
{
SysCheckArgNotNull(pStreamReader, {});
auto pObject = AuMakeShared<AsyncReaderWriter>();
SysCheckNotNullMemory(pObject, {});
pObject->pStreamReaderEx = pStreamReader;
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
return pObject;
}
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamWriter(const AuSPtr<IStreamWriter> &pStreamWriter, AuOptional<AuWorkerPId> workers)
{
SysCheckArgNotNull(pStreamWriter, {});
auto pObject = AuMakeShared<AsyncReaderWriter>();
SysCheckNotNullMemory(pObject, {});
pObject->pStreamWriter = pStreamWriter;
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
return pObject;
}
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamSeekingWriter(const AuSPtr<ISeekingWriter> &pStreamWriter, AuOptional<Aurora::Async::WorkerPId_t> workers)
{
SysCheckArgNotNull(pStreamWriter, {});
auto pObject = AuMakeShared<AsyncReaderWriter>();
SysCheckNotNullMemory(pObject, {});
pObject->pStreamWriterEx = pStreamWriter;
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
return pObject;
}
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamSeekingPair(const AuSPtr<ISeekingReader> &pStreamReader,
const AuSPtr<ISeekingWriter> &pStreamWriter,
AuOptional<Aurora::Async::WorkerPId_t> workers)
{
SysCheckArgNotNull(pStreamWriter, {});
SysCheckArgNotNull(pStreamReader, {});
auto pObject = AuMakeShared<AsyncReaderWriter>();
SysCheckNotNullMemory(pObject, {});
pObject->pStreamReaderEx = pStreamReader;
pObject->pStreamWriterEx = pStreamWriter;
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
return pObject;
}
}