AuroraRuntime/Source/IO/Adapters/AuIOAdapterAsyncDelegators.cpp
J Reece Wilson 2371794d47 [+] Linux Exception Handlers and AuExit::ETriggerLevel::eSigQuitNow
[*] Linux stability fixes
[+] AuProcAddresses.UNIX.[cpp/hpp]
2024-03-05 13:55:21 +00:00

273 lines
8.7 KiB
C++

/***
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuIOAdapterAsyncDelegators.cpp
Date: 2024-2-24
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuIOAdapterAsyncDelegators.hpp"
#include <Source/IO/Async/AuIOThreadPool.hpp>
#include <Source/IO/Async/AuIOAPCLessWaitable.hpp>
namespace Aurora::IO::Adapters
{
struct AsyncReaderWriter : IAsyncTransaction,
Async::APCLessWaitable,
AuAsync::IWorkItemHandler
{
AuSPtr<IStreamReader> pStreamReader;
AuSPtr<ISeekingReader> pStreamReaderEx;
AuSPtr<IStreamWriter> pStreamWriter;
AuSPtr<ISeekingWriter> pStreamWriterEx;
AuSPtr<Memory::MemoryViewRead> pReadView;
AuSPtr<Memory::MemoryViewWrite> pWriteView;
AuWorkerPId workers;
void DispatchFrame(ProcessInfo &info) override
{
try
{
if (this->pReadView)
{
if (this->pStreamWriter)
{
this->eStreamError = this->pStreamWriter->Write(AuMemoryViewStreamRead(*this->pReadView, this->uLastLength));
}
else if (this->pStreamWriterEx)
{
this->eStreamError = this->pStreamWriterEx->ArbitraryWrite(this->uBaseOffset + this->uLastOffset, AuMemoryViewStreamRead(*this->pReadView, this->uLastLength));
}
}
else if (this->pWriteView)
{
if (this->pStreamReader)
{
this->eStreamError = this->pStreamReader->Read(AuMemoryViewStreamWrite(*this->pWriteView, this->uLastLength));
}
else if (this->pStreamReaderEx)
{
this->eStreamError = this->pStreamReaderEx->ArbitraryRead(this->uBaseOffset + this->uLastOffset, AuMemoryViewStreamWrite(*this->pWriteView, this->uLastLength));
}
}
}
catch (...)
{
this->eStreamError = EStreamError::eErrorGenericFault;
}
this->bInProgress = false;
this->SignalComplete();
}
void OnFailure() override
{
this->bInProgress = false;
this->Reset();
}
bool StartRead(AuUInt64 uOffset, const AuSPtr<Memory::MemoryViewWrite> &memoryView) override
{
if (!memoryView)
{
SysPushErrorArg();
return false;
}
if (this->bInProgress)
{
return false;
}
if (!this->pStreamReaderEx && !this->pStreamReader)
{
return false;
}
AuResetMember(this->pReadView);
AuResetMember(this->pWriteView);
this->bInProgress = true;
this->uLastOffset = uOffset;
this->pWriteView = memoryView;
auto pThat = AuAsync::NewWorkItem(this->workers, AuStaticCast<AsyncReaderWriter>(this->SharedFromThis()));
if (!pThat)
{
return false;
}
return pThat->Dispatch();
}
bool StartWrite(AuUInt64 uOffset, const AuSPtr<Memory::MemoryViewRead> &memoryView) override
{
if (!memoryView)
{
SysPushErrorArg();
return false;
}
if (this->bInProgress)
{
return false;
}
if (!this->pStreamWriterEx && !this->pStreamWriter)
{
return false;
}
AuResetMember(this->pReadView);
AuResetMember(this->pWriteView);
this->bInProgress = true;
this->uLastOffset = uOffset;
this->pReadView = memoryView;
auto pThat = AuAsync::NewWorkItem(this->workers, AuStaticCast<AsyncReaderWriter>(this->SharedFromThis()));
if (!pThat)
{
return false;
}
return pThat->Dispatch();
}
AuUInt32 GetLastPacketLength() override
{
return this->uLastLength;
}
void OnOriginThreadComplete() override
{
if (this->pSubscriber)
{
this->pSubscriber->OnAsyncFileOpFinished(this->uBaseOffset + this->uLastOffset, this->GetLastPacketLength());
}
AuResetMember(this->pReadView);
AuResetMember(this->pWriteView);
}
bool Complete() override
{
Async::APCLessWaitable::CheckLocal();
return Async::APCLessWaitable::HasBeenSignaled();
}
bool HasFailed() override
{
return Async::APCLessWaitable::HasBeenSignaled() && this->eStreamError != EStreamError::eErrorNone;
}
bool HasCompleted() override
{
return Async::APCLessWaitable::HasBeenSignaled() && this->eStreamError == EStreamError::eErrorNone;
}
AuUInt GetOSErrorCode() override
{
return this->HasFailed() ? AuUInt(this->eStreamError) : 0;
}
void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &pSubscriber) override
{
this->pSubscriber = pSubscriber;
}
bool Wait(AuUInt32 uTimeout) override
{
return NewLoopSource()->WaitOn(uTimeout);
}
AuSPtr<IO::Loop::ILoopSource> NewLoopSource() override
{
return Async::APCLessWaitable::GetLoopSource();
}
void Reset() override
{
if (this->bInProgress)
{
return;
}
AuResetMember(this->pReadView);
AuResetMember(this->pWriteView);
Async::APCLessWaitable::Reset();
}
void SetBaseOffset(AuUInt64 uBaseOffset) override
{
this->uBaseOffset = uBaseOffset;
}
bool TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup) override
{
return Async::APCLessWaitable::TryAttachToCompletionGroup(pCompletionGroup);
}
CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle() override
{
return this;
}
AuSPtr<CompletionGroup::ICompletionGroup> GetCompletionGroup() override
{
return Async::APCLessWaitable::GetCompletionGroup();
}
private:
AuMutex mutex;
AuSPtr<IAsyncFinishedSubscriber> pSubscriber;
AuUInt64 uBaseOffset {};
AuUInt64 uLastOffset {};
AuUInt uLastLength {};
bool bInProgress {};
EStreamError eStreamError = EStreamError::eErrorNone;
};
static AuWorkerPId GetAuxWorkerPool()
{
return Async::GetAuxWorkerPoolAndRegister();
}
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamReader(const AuSPtr<IStreamReader> &pStreamReader, AuOptional<AuWorkerPId> workers)
{
SysCheckArgNotNull(pStreamReader, {});
auto pObject = AuMakeShared<AsyncReaderWriter>();
SysCheckNotNullMemory(pObject, {});
pObject->pStreamReader = pStreamReader;
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
return pObject;
}
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamSeekingReader(const AuSPtr<ISeekingReader> &pStreamReader, AuOptional<AuWorkerPId> workers)
{
SysCheckArgNotNull(pStreamReader, {});
auto pObject = AuMakeShared<AsyncReaderWriter>();
SysCheckNotNullMemory(pObject, {});
pObject->pStreamReaderEx = pStreamReader;
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
return pObject;
}
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamWriter(const AuSPtr<IStreamWriter> &pStreamWriter, AuOptional<AuWorkerPId> workers)
{
SysCheckArgNotNull(pStreamWriter, {});
auto pObject = AuMakeShared<AsyncReaderWriter>();
SysCheckNotNullMemory(pObject, {});
pObject->pStreamWriter = pStreamWriter;
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
return pObject;
}
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamSeekingWriter(const AuSPtr<ISeekingWriter> &pStreamWriter, AuOptional<Aurora::Async::WorkerPId_t> workers)
{
SysCheckArgNotNull(pStreamWriter, {});
auto pObject = AuMakeShared<AsyncReaderWriter>();
SysCheckNotNullMemory(pObject, {});
pObject->pStreamWriterEx = pStreamWriter;
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
return pObject;
}
}