/*** Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuIOAdapterAsyncDelegators.cpp Date: 2024-2-24 Author: Reece ***/ #include #include "AuIOAdapterAsyncDelegators.hpp" #include #include namespace Aurora::IO::Adapters { struct AsyncReaderWriter : IAsyncTransaction, Async::APCLessWaitable, AuAsync::IWorkItemHandler { AuSPtr pStreamReader; AuSPtr pStreamReaderEx; AuSPtr pStreamWriter; AuSPtr pStreamWriterEx; AuSPtr pReadView; AuSPtr pWriteView; AuWorkerPId workers; void DispatchFrame(ProcessInfo &info) override { try { if (this->pReadView) { if (this->pStreamWriter) { this->eStreamError = this->pStreamWriter->Write(AuMemoryViewStreamRead(*this->pReadView, this->uLastLength)); } else if (this->pStreamWriterEx) { this->eStreamError = this->pStreamWriterEx->ArbitraryWrite(this->uBaseOffset + this->uLastOffset, AuMemoryViewStreamRead(*this->pReadView, this->uLastLength)); } } else if (this->pWriteView) { if (this->pStreamReader) { this->eStreamError = this->pStreamReader->Read(AuMemoryViewStreamWrite(*this->pWriteView, this->uLastLength)); } else if (this->pStreamReaderEx) { this->eStreamError = this->pStreamReaderEx->ArbitraryRead(this->uBaseOffset + this->uLastOffset, AuMemoryViewStreamWrite(*this->pWriteView, this->uLastLength)); } } } catch (...) { this->eStreamError = EStreamError::eErrorGenericFault; } this->bInProgress = false; this->SignalComplete(); } void OnFailure() { this->bInProgress = false; this->Reset(); } bool StartRead(AuUInt64 uOffset, const AuSPtr &memoryView) override { if (!memoryView) { SysPushErrorArg(); return false; } if (this->bInProgress) { return false; } if (!this->pStreamReaderEx && !this->pStreamReader) { return false; } AuResetMember(this->pReadView); AuResetMember(this->pWriteView); this->bInProgress = true; this->uLastOffset = uOffset; this->pWriteView = memoryView; auto pThat = AuAsync::NewWorkItem(this->workers, AuStaticCast(this->SharedFromThis())); if (!pThat) { return false; } return pThat->Dispatch(); } bool StartWrite(AuUInt64 uOffset, const AuSPtr &memoryView) override { if (!memoryView) { SysPushErrorArg(); return false; } if (this->bInProgress) { return false; } if (!this->pStreamWriterEx && !this->pStreamWriter) { return false; } AuResetMember(this->pReadView); AuResetMember(this->pWriteView); this->bInProgress = true; this->uLastOffset = uOffset; this->pReadView = memoryView; auto pThat = AuAsync::NewWorkItem(this->workers, AuStaticCast(this->SharedFromThis())); if (!pThat) { return false; } return pThat->Dispatch(); } AuUInt32 GetLastPacketLength() override { return this->uLastLength; } void OnOriginThreadComplete() override { if (this->pSubscriber) { this->pSubscriber->OnAsyncFileOpFinished(this->uBaseOffset + this->uLastOffset, this->GetLastPacketLength()); } AuResetMember(this->pReadView); AuResetMember(this->pWriteView); } bool Complete() override { Async::APCLessWaitable::CheckLocal(); return Async::APCLessWaitable::HasBeenSignaled(); } bool HasFailed() override { return Async::APCLessWaitable::HasBeenSignaled() && this->eStreamError != EStreamError::eErrorNone; } bool HasCompleted() override { return Async::APCLessWaitable::HasBeenSignaled() && this->eStreamError == EStreamError::eErrorNone; } AuUInt GetOSErrorCode() override { return this->HasFailed() ? AuUInt(this->eStreamError) : 0; } void SetCallback(const AuSPtr &pSubscriber) override { this->pSubscriber = pSubscriber; } bool Wait(AuUInt32 uTimeout) override { return NewLoopSource()->WaitOn(uTimeout); } AuSPtr NewLoopSource() override { return Async::APCLessWaitable::GetLoopSource(); } void Reset() override { if (this->bInProgress) { return; } AuResetMember(this->pReadView); AuResetMember(this->pWriteView); Async::APCLessWaitable::Reset(); } void SetBaseOffset(AuUInt64 uBaseOffset) override { this->uBaseOffset = uBaseOffset; } bool TryAttachToCompletionGroup(const AuSPtr &pCompletionGroup) override { return Async::APCLessWaitable::TryAttachToCompletionGroup(pCompletionGroup); } CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle() override { return this; } AuSPtr GetCompletionGroup() override { return Async::APCLessWaitable::GetCompletionGroup(); } private: AuMutex mutex; AuSPtr pSubscriber; AuUInt64 uBaseOffset {}; AuUInt64 uLastOffset {}; AuUInt uLastLength {}; bool bInProgress {}; EStreamError eStreamError = EStreamError::eErrorNone; }; static AuWorkerPId GetAuxWorkerPool() { return Async::GetAuxWorkerPoolAndRegister(); } AUKN_SYM AuSPtr NewAsyncTransactionFromStreamReader(const AuSPtr &pStreamReader, AuOptional workers) { SysCheckArgNotNull(pStreamReader, {}); auto pObject = AuMakeShared(); SysCheckNotNullMemory(pObject, {}); pObject->pStreamReader = pStreamReader; pObject->workers = workers.ValueOr(GetAuxWorkerPool()); return pObject; } AUKN_SYM AuSPtr NewAsyncTransactionFromStreamSeekingReader(const AuSPtr &pStreamReader, AuOptional workers) { SysCheckArgNotNull(pStreamReader, {}); auto pObject = AuMakeShared(); SysCheckNotNullMemory(pObject, {}); pObject->pStreamReaderEx = pStreamReader; pObject->workers = workers.ValueOr(GetAuxWorkerPool()); return pObject; } AUKN_SYM AuSPtr NewAsyncTransactionFromStreamWriter(const AuSPtr &pStreamWriter, AuOptional workers) { SysCheckArgNotNull(pStreamWriter, {}); auto pObject = AuMakeShared(); SysCheckNotNullMemory(pObject, {}); pObject->pStreamWriter = pStreamWriter; pObject->workers = workers.ValueOr(GetAuxWorkerPool()); return pObject; } AUKN_SYM AuSPtr NewAsyncTransactionFromStreamSeekingWriter(const AuSPtr &pStreamWriter, AuOptional workers) { SysCheckArgNotNull(pStreamWriter, {}); auto pObject = AuMakeShared(); SysCheckNotNullMemory(pObject, {}); pObject->pStreamWriterEx = pStreamWriter; pObject->workers = workers.ValueOr(GetAuxWorkerPool()); return pObject; } }