AuroraRuntime/Source/IO/AuIOPipeProcessor.hpp

129 lines
3.4 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOPipeProcessor.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
#include <Aurora/Utility/ThroughputCalculator.hpp>
namespace Aurora::IO
{
struct IOProcessor;
struct IOPipeWork;
struct IOPipeProcessor;
struct IOWorkStart : IIOProcessorWorkUnit
{
IOWorkStart(IOPipeWork *parent);
void OnRun() override;
void OnCanceled() override;
IOPipeWork *parent {};
};
struct IOWorkEnd : IIOProcessorWorkUnit
{
IOWorkEnd(IOPipeWork *parent);
void OnRun() override;
void OnCanceled() override;
IOPipeWork *parent {};
};
struct IOPipeWork : IIOPipeWork, IIOEventListenerFunctional, AuEnableSharedFromThis<IIOPipeWork>
{
IOPipeWork(const AuSPtr<IOPipeProcessor> &pParent, const IOPipeRequestAIO &request);
IOPipeWork(const AuSPtr<IOPipeProcessor> &pParent, const IOPipeRequestBasic &request);
AuSPtr<IIOProcessorItem> pWatch;
void Tick_FrameEpilogue() override;
void Tick_Any() override;
void OnFailureCompletion() override;
void OnNominalCompletion() override;
virtual bool Start() override;
virtual bool End() override;
virtual AuInt64 GetLastTickMS() override;
virtual AuInt64 GetStartTickMS() override;
virtual AuUInt64 GetBytesProcessed() override;
virtual AuUInt64 GetBytesProcessedInterframe() override;
virtual double GetPredictedThroughput() override;
void RunOnThread();
void TerminateOnThread(bool bError = false);
// INIT
void PrepareStream();
void PrepareAsync();
// PUMP
void AsyncPump();
void StreamPump();
// END/INIT
void ReadNext();
void ReadNextAsync();
AuUInt32 TryPump();
AuMemoryViewWrite nextWriteAsync_;
IOPipeRequest request {};
bool bShouldReadNext {false};
bool IsAtRequestedEnd();
AuByteBuffer *GetBuffer();
AuSPtr<Protocol::IProtocolStack> pProtocolStack;
private:
AuSPtr<IOPipeProcessor> parent_;
struct /*not a union. the following members are mutex*/
{
IOPipeInputData input_;
AuSPtr<IAsyncTransaction> pAsyncTransaction_;
AuSPtr<Adapters::IAsyncStreamAdapter> pAsyncAdapter_;
AuSPtr<Adapters::IAsyncStreamReader> pAsyncStreamReader_;
};
IOPipeCallback output;
IOWorkStart startCallback;
IOWorkEnd endCallback;
AuUInt64 iStartTickMS_ {};
bool bActive {true};
AuUInt32 uBufferSize_ {};
AuUInt32 uFrameCap_ {};
AuUInt uBytesWritten_ {};
AuUInt uBytesWrittenLimit_ {};
AuUInt uBytesWrittenTarget_ {};
AuByteBuffer buffer_;
Aurora::Utility::ThroughputCalculator throughput_;
AuUInt bytesProcessedInterframe_ {};
bool bWritingAheadLowLatency {};
bool bWritingAheadIOUOneTerminate {};
};
struct IOPipeProcessor : IIOPipeProcessor
{
IOPipeProcessor(IOProcessor *parent);
virtual AuSPtr<IIOPipeWork> NewBasicPipe(const IOPipeRequestBasic &request) override;
virtual AuSPtr<IIOPipeWork> NewAIOPipe(const IOPipeRequestAIO &request) override;
IOProcessor *parent_;
};
}