/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: IOPipeProcessor.hpp Date: 2022-6-6 Author: Reece ***/ #pragma once #include namespace Aurora::IO { struct IOProcessor; struct IOPipeWork; struct IOPipeProcessor; struct IOWorkStart : IIOProcessorWorkUnit { IOWorkStart(IOPipeWork *parent); void OnRun() override; void OnCanceled() override; IOPipeWork *parent {}; }; struct IOWorkEnd : IIOProcessorWorkUnit { IOWorkEnd(IOPipeWork *parent); void OnRun() override; void OnCanceled() override; IOPipeWork *parent {}; }; struct IOPipeWork : IIOPipeWork, IIOEventListenerFunctional, AuEnableSharedFromThis { IOPipeWork(const AuSPtr &pParent, const IOPipeRequestAIO &request); IOPipeWork(const AuSPtr &pParent, const IOPipeRequestBasic &request); AuSPtr pWatch; void Tick_FrameEpilogue() override; void Tick_Any() override; void OnFailureCompletion() override; void OnNominalCompletion() override; virtual bool Start() override; virtual bool End() override; virtual AuInt64 GetLastTickMS() override; virtual AuInt64 GetStartTickMS() override; virtual AuUInt64 GetBytesProcessed() override; virtual double GetPredictedThroughput() override; void RunOnThread(); void TerminateOnThread(bool bError = false); // INIT void PrepareStream(); void PrepareAsync(); // PUMP void AsyncPump(); void StreamPump(); // END/INIT void ReadNext(); void ReadNextAsync(); AuUInt32 TryPump(); AuMemoryViewWrite nextWriteAsync_; IOPipeRequest request {}; bool bShouldReadNext {false}; bool IsAtRequestedEnd(); AuByteBuffer *GetBuffer(); AuSPtr pProtocolStack; private: AuSPtr parent_; struct /*not a union. the following members are mutex*/ { IOPipeInputData input_; AuSPtr pAsyncTransaction_; AuSPtr pAsyncAdapter_; AuSPtr pAsyncStreamReader_; }; IOPipeCallback output; IOWorkStart startCallback; IOWorkEnd endCallback; AuUInt64 iStartTickMS_ {}; bool bActive {true}; AuUInt32 uBufferSize_ {}; AuUInt32 uFrameCap_ {}; AuUInt uBytesWritten_ {}; AuUInt uBytesWrittenLimit_ {}; AuUInt uBytesWrittenTarget_ {}; AuByteBuffer buffer_; Utility::ThroughputCalculator throughput_; }; struct IOPipeProcessor : IIOPipeProcessor { IOPipeProcessor(IOProcessor *parent); virtual AuSPtr NewBasicPipe(const IOPipeRequestBasic &request) override; virtual AuSPtr NewAIOPipe(const IOPipeRequestAIO &request) override; IOProcessor *parent_; }; }