/*** Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: AuIOAdapterAsyncDelegators.cpp Date: 2024-2-24 Author: Reece ***/ #include #include "AuIOAdapterAsyncDelegators.hpp" #include #include namespace Aurora::IO::Adapters { void AsyncReaderWriter::DispatchFrame(ProcessInfo &info) { try { if (this->iBaseOffset + AuInt64(this->uLastOffset) < 0) { this->eStreamError = EStreamError::eErrorOutOfBounds; } else if (this->pReadView) { if (this->pStreamWriter) { this->eStreamError = this->pStreamWriter->Write(AuMemoryViewStreamRead(*this->pReadView, this->uLastLength)); } else if (this->pStreamWriterEx) { this->eStreamError = this->pStreamWriterEx->ArbitraryWrite(this->iBaseOffset + this->uLastOffset, AuMemoryViewStreamRead(*this->pReadView, this->uLastLength)); } } else if (this->pWriteView) { if (this->pStreamReader) { this->eStreamError = this->pStreamReader->Read(AuMemoryViewStreamWrite(*this->pWriteView, this->uLastLength)); } else if (this->pStreamReaderEx) { this->eStreamError = this->pStreamReaderEx->ArbitraryRead(this->iBaseOffset + this->uLastOffset, AuMemoryViewStreamWrite(*this->pWriteView, this->uLastLength)); } } } catch (...) { this->eStreamError = EStreamError::eErrorGenericFault; } this->bInProgress = false; this->SignalComplete(); } void AsyncReaderWriter::OnFailure() { this->bInProgress = false; this->Reset(); } bool AsyncReaderWriter::StartRead(AuUInt64 uOffset, const AuSPtr &memoryView) { if (!memoryView) { SysPushErrorArg(); return false; } if (this->bInProgress) { return false; } if (!this->pStreamReaderEx && !this->pStreamReader) { return false; } AuResetMember(this->pReadView); AuResetMember(this->pWriteView); this->bInProgress = true; this->uLastOffset = uOffset; this->pWriteView = memoryView; auto pThat = AuAsync::NewWorkItem(this->workers, AuStaticCast(this->SharedFromThis())); if (!pThat) { return false; } return pThat->Dispatch(); } bool AsyncReaderWriter::StartWrite(AuUInt64 uOffset, const AuSPtr &memoryView) { if (!memoryView) { SysPushErrorArg(); return false; } if (this->bInProgress) { return false; } if (!this->pStreamWriterEx && !this->pStreamWriter) { return false; } AuResetMember(this->pReadView); AuResetMember(this->pWriteView); this->bInProgress = true; this->uLastOffset = uOffset; this->pReadView = memoryView; auto pThat = AuAsync::NewWorkItem(this->workers, AuStaticCast(this->SharedFromThis())); if (!pThat) { return false; } return pThat->Dispatch(); } AuUInt32 AsyncReaderWriter::GetLastPacketLength() { return this->uLastLength; } void AsyncReaderWriter::OnOriginThreadComplete() { if (this->pSubscriber) { this->pSubscriber->OnAsyncFileOpFinished(this->uLastOffset, this->GetLastPacketLength()); } AuResetMember(this->pReadView); AuResetMember(this->pWriteView); } bool AsyncReaderWriter::Complete() { Async::APCLessWaitable::CheckLocal(); return Async::APCLessWaitable::HasBeenSignaled(); } bool AsyncReaderWriter::HasFailed() { return Async::APCLessWaitable::HasBeenSignaled() && this->eStreamError != EStreamError::eErrorNone; } bool AsyncReaderWriter::HasCompleted() { return Async::APCLessWaitable::HasBeenSignaled() && this->eStreamError == EStreamError::eErrorNone; } AuUInt AsyncReaderWriter::GetOSErrorCode() { return this->HasFailed() ? AuUInt(this->eStreamError) : 0; } void AsyncReaderWriter::SetCallback(const AuSPtr &pSubscriber) { this->pSubscriber = pSubscriber; } bool AsyncReaderWriter::Wait(AuUInt32 uTimeout) { return NewLoopSource()->WaitOn(uTimeout); } AuSPtr AsyncReaderWriter::NewLoopSource() { return Async::APCLessWaitable::GetLoopSource(); } void AsyncReaderWriter::Reset() { if (this->bInProgress) { return; } AuResetMember(this->pReadView); AuResetMember(this->pWriteView); Async::APCLessWaitable::Reset(); } void AsyncReaderWriter::SetBaseOffset(AuUInt64 uBaseOffset) { this->iBaseOffset = uBaseOffset; } bool AsyncReaderWriter::TryAttachToCompletionGroup(const AuSPtr &pCompletionGroup) { return Async::APCLessWaitable::TryAttachToCompletionGroup(pCompletionGroup); } CompletionGroup::ICompletionGroupWorkHandle *AsyncReaderWriter::ToCompletionGroupHandle() { return this; } AuSPtr AsyncReaderWriter::GetCompletionGroup() { return Async::APCLessWaitable::GetCompletionGroup(); } static AuWorkerPId GetAuxWorkerPool() { return Async::GetAuxWorkerPoolAndRegister(); } AUKN_SYM AuSPtr NewAsyncTransactionFromStreamReader(const AuSPtr &pStreamReader, AuOptional workers) { SysCheckArgNotNull(pStreamReader, {}); auto pObject = AuMakeShared(); SysCheckNotNullMemory(pObject, {}); pObject->pStreamReader = pStreamReader; pObject->workers = workers.ValueOr(GetAuxWorkerPool()); return pObject; } AUKN_SYM AuSPtr NewAsyncTransactionFromStreamSeekingReader(const AuSPtr &pStreamReader, AuOptional workers) { SysCheckArgNotNull(pStreamReader, {}); auto pObject = AuMakeShared(); SysCheckNotNullMemory(pObject, {}); pObject->pStreamReaderEx = pStreamReader; pObject->workers = workers.ValueOr(GetAuxWorkerPool()); return pObject; } AUKN_SYM AuSPtr NewAsyncTransactionFromStreamWriter(const AuSPtr &pStreamWriter, AuOptional workers) { SysCheckArgNotNull(pStreamWriter, {}); auto pObject = AuMakeShared(); SysCheckNotNullMemory(pObject, {}); pObject->pStreamWriter = pStreamWriter; pObject->workers = workers.ValueOr(GetAuxWorkerPool()); return pObject; } AUKN_SYM AuSPtr NewAsyncTransactionFromStreamSeekingWriter(const AuSPtr &pStreamWriter, AuOptional workers) { SysCheckArgNotNull(pStreamWriter, {}); auto pObject = AuMakeShared(); SysCheckNotNullMemory(pObject, {}); pObject->pStreamWriterEx = pStreamWriter; pObject->workers = workers.ValueOr(GetAuxWorkerPool()); return pObject; } AUKN_SYM AuSPtr NewAsyncTransactionFromStreamSeekingPair(const AuSPtr &pStreamReader, const AuSPtr &pStreamWriter, AuOptional workers) { SysCheckArgNotNull(pStreamWriter, {}); SysCheckArgNotNull(pStreamReader, {}); auto pObject = AuMakeShared(); SysCheckNotNullMemory(pObject, {}); pObject->pStreamReaderEx = pStreamReader; pObject->pStreamWriterEx = pStreamWriter; pObject->workers = workers.ValueOr(GetAuxWorkerPool()); return pObject; } }