Reece
44108a322e
[+] TTYConsole::GetPaddingTopOfLog,GetPaddingHeadOfLog,GetPaddingTopOfLog [+ set variants] [+] IO::IOYield() [+] IO::IAsyncTransaction::Failed,GetOSErrorCode() [+] IByteBufferStreamPair [+] IIOBufferedInterceptor [+] IIOBufferedProcessor [+] IIOEventListener [+] IIOPipeEventListener [+] IIOProcessorEventListener [+] IIOProcessorManualInvoker [+] IIOWaitableIOLoopSource [+] IIOWaitableIOTimer [+] IIOWaitableItem [+] IIOWaitableTickLimiter [+] IOAdapterAsyncStream [+] IOAdapterByteBuffer [+] IOAdapterCompression [+] IOAdapterSeeking [*] Cleanup CpuInfo.Linux.cpp [*] Fixup async threadpool some more [*] LSTimer.NT.cpp updates timer object on tick state update, akin to Linux
130 lines
4.1 KiB
C++
130 lines
4.1 KiB
C++
/***
|
|
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
|
|
|
File: IOBufferedProcessor.cpp
|
|
Date: 2022-6-6
|
|
Author: Reece
|
|
***/
|
|
#include <Source/RuntimeInternal.hpp>
|
|
#include <Aurora/IO/IOExperimental.hpp>
|
|
#include "IOBufferedProcessor.hpp"
|
|
|
|
namespace Aurora::IO
|
|
{
|
|
struct IOBufferedProcessor : IIOBufferedProcessor
|
|
{
|
|
AuSPtr<IStreamReader> source;
|
|
AuSPtr<IStreamWriter> drain;
|
|
AuSPtr<IIOPipeInterceptor> processor;
|
|
AuUInt32 bufferSize {};
|
|
AuByteBuffer buffer;
|
|
|
|
AU_DEFINE_FOR_VA(IOBufferedProcessor,
|
|
(AU_DEFINE_CTOR_VA, // initializer-list-like ctor (extending a struct or adding a ctor will break initializer lists)
|
|
AU_DEFINE_THIS_MOVE_CTOR_VA, // add move `Object(Object &&)`
|
|
AU_DEFINE_EQUALS_VA, // add equals operator
|
|
AU_DEFINE_MOVE_VA, // add move assignment operator
|
|
AU_DEFINE_COPY_VA), // add copy assignment operator
|
|
(source, drain, processor, bufferSize));
|
|
|
|
|
|
AuUInt32 TryProcessBuffered() override;
|
|
AuUInt32 GetRawBytesBuffered() override;
|
|
AuUInt32 GetRawBytesLimit() override;
|
|
|
|
AuUInt32 TryPump();
|
|
};
|
|
|
|
AuUInt32 IOBufferedProcessor::TryProcessBuffered()
|
|
{
|
|
if (this->buffer.IsEmpty())
|
|
{
|
|
this->buffer.Allocate(this->bufferSize);
|
|
this->buffer.flagCircular = true; // !!!
|
|
}
|
|
|
|
if (this->buffer.IsEmpty())
|
|
{
|
|
return TryPump();
|
|
}
|
|
|
|
AuUInt canBuffer = this->buffer.RemainingWrite();
|
|
canBuffer = AuMin(canBuffer, AuUInt((this->buffer.length + this->buffer.base) - this->buffer.writePtr));
|
|
|
|
AuUInt read {};
|
|
try
|
|
{
|
|
if (this->source->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer.writePtr, canBuffer), read)) !=
|
|
AuIO::EStreamError::eErrorNone)
|
|
{
|
|
return TryPump();
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
|
|
this->buffer.writePtr += read;
|
|
|
|
if (this->buffer.writePtr == this->buffer.base + this->buffer.length)
|
|
{
|
|
this->buffer.writePtr = this->buffer.base;
|
|
}
|
|
|
|
return TryPump();
|
|
}
|
|
|
|
AuUInt32 IOBufferedProcessor::TryPump()
|
|
{
|
|
AuUInt bytesProcessedTotal {};
|
|
AuUInt bytesProcessed {};
|
|
|
|
do
|
|
{
|
|
AuUInt canRead = this->buffer.RemainingBytes();
|
|
canRead = AuMin<AuUInt>(canRead, (this->buffer.length + this->buffer.base) - this->buffer.readPtr);
|
|
|
|
try
|
|
{
|
|
if (!this->processor->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer.readPtr, canRead), bytesProcessed), this->drain))
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
SysPushErrorCatch();
|
|
}
|
|
|
|
this->buffer.readPtr += bytesProcessed;
|
|
bytesProcessedTotal += bytesProcessed;
|
|
|
|
if (this->buffer.readPtr == this->buffer.base + this->buffer.length)
|
|
{
|
|
this->buffer.readPtr = this->buffer.base;
|
|
}
|
|
}
|
|
while (AuExchange(bytesProcessed, 0));
|
|
|
|
return bytesProcessedTotal;
|
|
}
|
|
|
|
AuUInt32 IOBufferedProcessor::GetRawBytesBuffered()
|
|
{
|
|
return this->buffer.RemainingBytes();
|
|
}
|
|
|
|
AuUInt32 IOBufferedProcessor::GetRawBytesLimit()
|
|
{
|
|
return this->bufferSize;
|
|
}
|
|
|
|
AUKN_SYM AuSPtr<IIOBufferedProcessor> NewBufferedProcessor(const AuSPtr<IStreamReader> &source,
|
|
const AuSPtr<IIOPipeInterceptor> &processor,
|
|
const AuSPtr<IStreamWriter> &drain,
|
|
AuUInt32 bufferSize)
|
|
{
|
|
return AuMakeShared<IOBufferedProcessor>(source, drain, processor, bufferSize);
|
|
}
|
|
} |