[*] Cleanup/refactor of AuIO

This commit is contained in:
Reece Wilson 2022-08-29 16:46:46 +01:00
parent fa0d46d4c7
commit 401cf09962
32 changed files with 274 additions and 221 deletions

View File

@ -10,6 +10,6 @@
namespace Aurora::IO namespace Aurora::IO
{ {
AUKN_INTERFACE(IAsyncFinishedSubscriber, AUKN_INTERFACE(IAsyncFinishedSubscriber,
AUI_METHOD(void, OnAsyncFileOpFinished, (AuUInt64, offset, AuUInt32, length)) AUI_METHOD(void, OnAsyncFileOpFinished, (AuUInt64, uOffset, AuUInt32, uLength))
); );
} }

View File

@ -24,8 +24,8 @@ namespace Aurora::IO
*/ */
struct IAsyncTransaction struct IAsyncTransaction
{ {
virtual bool StartRead(AuUInt64 offset, const AuSPtr<Memory::MemoryViewWrite> &memoryView) = 0; virtual bool StartRead(AuUInt64 uOffset, const AuSPtr<Memory::MemoryViewWrite> &memoryView) = 0;
virtual bool StartWrite(AuUInt64 offset, const AuSPtr<Memory::MemoryViewRead> &memoryView) = 0; virtual bool StartWrite(AuUInt64 uOffset, const AuSPtr<Memory::MemoryViewRead> &memoryView) = 0;
/** /**
* @brief Non-blocking is-signaled and call callback poll routine * @brief Non-blocking is-signaled and call callback poll routine
@ -53,12 +53,12 @@ namespace Aurora::IO
* @brief Registers an NT-like APC callback for the IO transaction. * @brief Registers an NT-like APC callback for the IO transaction.
* Can be executed under any Aurora loop subsystem sleep * Can be executed under any Aurora loop subsystem sleep
*/ */
virtual void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &sub) = 0; virtual void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &pSubscriber) = 0;
/** /**
* @brief Block for completion * @brief Block for completion
*/ */
virtual bool Wait(AuUInt32 timeout) = 0; virtual bool Wait(AuUInt32 uTimeout) = 0;
/** /**
* @brief Provides a loop source that becomes signaled once the transaction is complete. * @brief Provides a loop source that becomes signaled once the transaction is complete.

View File

@ -19,6 +19,6 @@ namespace Aurora::IO
}; };
AUKN_SYM AuSPtr<IByteBufferStreamPair> NewByteBufferPair(); AUKN_SYM AuSPtr<IByteBufferStreamPair> NewByteBufferPair();
AUKN_SYM AuSPtr<IByteBufferStreamPair> NewByteBufferPairEx(AuUInt length, bool permitResize); AUKN_SYM AuSPtr<IByteBufferStreamPair> NewByteBufferPairEx(AuUInt uLength, bool bPermitResize);
AUKN_SYM AuSPtr<IByteBufferStreamPair> NewRingByteBuffer(AuUInt length); AUKN_SYM AuSPtr<IByteBufferStreamPair> NewRingByteBuffer(AuUInt uLength);
} }

View File

@ -16,8 +16,17 @@ namespace Aurora::IO
virtual AuUInt32 GetRawBytesLimit() = 0; virtual AuUInt32 GetRawBytesLimit() = 0;
}; };
AUKN_SYM AuSPtr<IIOBufferedProcessor> NewBufferedProcessor(const AuSPtr<IStreamReader> &source, /**
const AuSPtr<IIOPipeInterceptor> &processor, * @brief
const AuSPtr<IStreamWriter> &drain, // TODO: share IOPipeCallback * @param source
AuUInt32 bufferSize); * @param processor
* @param drain
* @param bufferSize
* @deprecated by Aurora::IO::Protocol (tobe AuProtocol:: and AuIOProtocol::)
* @return
*/
AUKN_SYM AuSPtr<IIOBufferedProcessor> NewBufferedProcessor(const AuSPtr<IStreamReader> &pSource,
const AuSPtr<IIOPipeInterceptor> &pProcessor,
const AuSPtr<IStreamWriter> &pDrain, // TODO: share IOPipeCallback
AuUInt32 uBufferSize);
} }

View File

@ -12,7 +12,7 @@ namespace Aurora::IO
struct IOPipeData; struct IOPipeData;
AUKN_INTERFACE(IIOPipeEventListener, AUKN_INTERFACE(IIOPipeEventListener,
AUI_METHOD(void, OnPipePartialEvent, (AuUInt, transferred)), AUI_METHOD(void, OnPipePartialEvent, (AuUInt, uTransferred)),
AUI_METHOD(void, OnPipeSuccessEvent, ()), AUI_METHOD(void, OnPipeSuccessEvent, ()),
AUI_METHOD(void, OnPipeFailureEvent, ()) AUI_METHOD(void, OnPipeFailureEvent, ())
); );

View File

@ -55,7 +55,27 @@ namespace Aurora::IO
virtual bool HasItems () = 0; virtual bool HasItems () = 0;
}; };
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessor(bool tickOnly, const AuSPtr<Loop::ILoopQueue> &queue); /**
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessorOnThread(bool tickOnly, Async::WorkerPId_t id); * @brief Creates an IOProcessor to execute tasks on a given loop queue
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessorNoQueue(bool tickOnly); * @param bTickOnly True if kernel yielding on IO objects is disabled in favor of forced TPS.
* Do note that the throughput of a tick will differ per platform. Accept rate,
* throughput, etc may need tweaking, if you wish to drive an IIOProcessor using
* a timer or the manual tick functions.
* @param pQueue
* @return
*/
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessor(bool bTickOnly, const AuSPtr<Loop::ILoopQueue> &pQueue);
/**
* @brief Creates an IOProcessor to execute tasks on the loop queue of a given AuAsync thread id
* @param bTickOnly True if kernel yielding on IO objects is disabled in favor of forced TPS.
* Do note that the throughput of a tick will differ per platform. Accept rate,
* throughput, etc may need tweaking, if you wish to drive an IIOProcessor using
* a timer or the manual tick functions.
* @param pQueue
* @return
*/
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessorOnThread(bool bTickOnly, Async::WorkerPId_t id);
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessorNoQueue(bool bTickOnly);
} }

View File

@ -11,6 +11,9 @@ namespace Aurora::IO
{ {
struct IIOProcessorManualInvoker struct IIOProcessorManualInvoker
{ {
/**
* @brief Used to invoke a tick on an IIOProcessor
*/
virtual void InvokeManualTick() = 0; virtual void InvokeManualTick() = 0;
}; };
} }

View File

@ -15,5 +15,8 @@ namespace Aurora::IO
AUI_METHOD(void, OnIOComplete, ()) AUI_METHOD(void, OnIOComplete, ())
); );
AUKN_SYM AuSPtr<IIOEventListener> DesimplifyIOEventListenerAdapter(const AuSPtr<IIOSimpleEventListener> &interface); /**
* @brief Creates an adapter to convert an object that implements the simple listener interface to the annoying internal listener interface (7 callbacks)
*/
AUKN_SYM AuSPtr<IIOEventListener> DesimplifyIOEventListenerAdapter(const AuSPtr<IIOSimpleEventListener> &pInterface);
} }

View File

@ -12,9 +12,22 @@ namespace Aurora::IO
struct IIOWatachableIOLoopSource : IIOWaitableItem struct IIOWatachableIOLoopSource : IIOWaitableItem
{ {
virtual AuSPtr<Loop::ILoopSource> GetLoopSource() = 0; virtual AuSPtr<Loop::ILoopSource> GetLoopSource() = 0;
virtual AuSPtr<Loop::ILoopSource> SetLoopSource(const AuSPtr<Loop::ILoopSource> &ls) = 0; virtual AuSPtr<Loop::ILoopSource> SetLoopSource(const AuSPtr<Loop::ILoopSource> &pLoopSource) = 0;
}; };
AUKN_SYM AuSPtr<IIOWatachableIOLoopSource> NewWaitableLoopSource(const AuSPtr<Loop::ILoopSource> &ptr); /**
AUKN_SYM AuSPtr<IIOWatachableIOLoopSource> NewWaitableLoopSourceEx(const AuSPtr<Loop::ILoopSource>& ptr, AuUInt32 msTimeout); * @brief Creates an adapter given an ILoopSource for the IO subsystem to wait on
* @param pLoopSource
* @return
*/
AUKN_SYM AuSPtr<IIOWatachableIOLoopSource> NewWaitableLoopSource(const AuSPtr<Loop::ILoopSource> &pLoopSource);
/**
* @brief Creates an adapter given an ILoopSource for the IO subsystem to wait on
* @param pLoopSource
* @param uMsTimeout
* @return
*/
AUKN_SYM AuSPtr<IIOWatachableIOLoopSource> NewWaitableLoopSourceEx(const AuSPtr<Loop::ILoopSource> &pLoopSource,
AuUInt32 uMsTimeout);
} }

View File

@ -9,7 +9,6 @@
namespace Aurora::IO namespace Aurora::IO
{ {
// Unstable and hacky interface // Unstable and hacky interface
// You shouldn't try to implement this yourself // You shouldn't try to implement this yourself
// Defer to IIOWaitableIOLoopSource, IIOWaitableIOTimer, and IIOWaitableTickLimiter for intended impl usage // Defer to IIOWaitableIOLoopSource, IIOWaitableIOTimer, and IIOWaitableTickLimiter for intended impl usage
@ -29,5 +28,4 @@ namespace Aurora::IO
AUI_METHOD(bool, ApplyRateLimit, ()) AUI_METHOD(bool, ApplyRateLimit, ())
); );
} }

View File

@ -46,5 +46,5 @@ namespace Aurora::IO
virtual bool Reset() = 0; virtual bool Reset() = 0;
}; };
AUKN_SYM AuSPtr<IAsyncStreamAdapter> NewAsyncStreamAdapter(const AuSPtr<IAsyncTransaction> &transaction, bool isStream); AUKN_SYM AuSPtr<IAsyncStreamAdapter> NewAsyncStreamAdapter(const AuSPtr<IAsyncTransaction> &pTransaction, bool bIsStream);
} }

View File

@ -9,7 +9,7 @@
namespace Aurora::IO namespace Aurora::IO
{ {
AUKN_SYM AuSPtr<IStreamReader> NewByteBufferReadAdapter(const AuSPtr<Memory::ByteBuffer> &buffer); AUKN_SYM AuSPtr<IStreamReader> NewByteBufferReadAdapter(const AuSPtr<Memory::ByteBuffer> &pBuffer);
AUKN_SYM AuSPtr<ISeekingReader> NewByteBufferLinearSeekableAdapter(const AuSPtr<Memory::ByteBuffer> &buffer); AUKN_SYM AuSPtr<ISeekingReader> NewByteBufferLinearSeekableAdapter(const AuSPtr<Memory::ByteBuffer> &pBuffer);
AUKN_SYM AuSPtr<IStreamWriter> NewByteBufferWriteAdapter(const AuSPtr<Memory::ByteBuffer> &buffer); AUKN_SYM AuSPtr<IStreamWriter> NewByteBufferWriteAdapter(const AuSPtr<Memory::ByteBuffer> &pBuffer);
} }

View File

@ -9,6 +9,6 @@
namespace Aurora::IO namespace Aurora::IO
{ {
AUKN_SYM AuSPtr<IStreamReader> NewCompressionReadAdapter(const AuSPtr<Compression::ICompressionStream> &compresionStream); AUKN_SYM AuSPtr<IStreamReader> NewCompressionReadAdapter(const AuSPtr<Compression::ICompressionStream> &pCompresionStream);
AUKN_SYM AuSPtr<ISeekingReader> NewCompressionSeekingAdapter(const AuSPtr<Compression::ICompressionStream> &compresionStream); AUKN_SYM AuSPtr<ISeekingReader> NewCompressionSeekingAdapter(const AuSPtr<Compression::ICompressionStream> &pCompresionStream);
} }

View File

@ -9,5 +9,5 @@
namespace Aurora::IO namespace Aurora::IO
{ {
AUKN_SYM AuSPtr<IStreamReader> NewSeekingReadAdapter(const AuSPtr<ISeekingReader> &reader); AUKN_SYM AuSPtr<IStreamReader> NewSeekingReadAdapter(const AuSPtr<ISeekingReader> &pReader);
} }

View File

@ -18,13 +18,13 @@ namespace Aurora::IO
struct struct
{ {
AuSPtr<IIOBufferedStreamAvailable> onData; AuSPtr<IIOBufferedStreamAvailable> pOnData;
} handleBufferedStream; } handleBufferedStream;
struct struct
{ {
AuSPtr<IIOPipeInterceptor> intercepter; AuSPtr<IIOPipeInterceptor> pIntercepter;
AuSPtr<IStreamWriter> writer; AuSPtr<IStreamWriter> pWriter;
bool bFlushWriter {true}; bool bFlushWriter {true};
} forwardStream; } forwardStream;

View File

@ -16,17 +16,17 @@ namespace Aurora::IO
/** /**
* @brief IO trigger * @brief IO trigger
*/ */
AuSPtr<IIOWaitableItem> watchItem; AuSPtr<IIOWaitableItem> pWatchItem;
/** /**
* @brief Input source * @brief Input source
*/ */
AuSPtr<IStreamReader> reader; AuSPtr<IStreamReader> pReader;
/** /**
* @brief Callbacks * @brief Callbacks
*/ */
AuSPtr<IPipeBackend> backend; AuSPtr<IPipeBackend> pBackend;
/** /**
* @brief Enables aggressive stream consumption, allowing for bias towards clients if they were to send a lot of data (including dos) * @brief Enables aggressive stream consumption, allowing for bias towards clients if they were to send a lot of data (including dos)

View File

@ -16,31 +16,31 @@ namespace Aurora::IO
/** /**
* @brief Amount of bytes to transfer or zero if run until EoS/EoF * @brief Amount of bytes to transfer or zero if run until EoS/EoF
*/ */
AuUInt32 lengthOrZero {}; AuUInt32 uLengthOrZero {};
/** /**
* @brief true if the underlying stream uses relative stream positions * @brief true if the underlying stream uses relative stream positions
* (IE: a network or pipe stream that cannot seek backwards and fowards) * (IE: a network or pipe stream that cannot seek backwards and fowards)
* (Inversely, file devices and similar IO subsystems use IO packets with absolute offsets) * (Inversely, file devices and similar IO subsystems use IO packets with absolute offsets)
*/ */
bool isStream {false}; bool bIsStream {false};
/** /**
* @brief internal frame size, that is one iteration of file or stream read, in bytes or zero if fallback * @brief internal frame size, that is one iteration of file or stream read, in bytes or zero if fallback
* Windows is inclined to read every single requested byte of a file stream asynchronously * Windows is inclined to read every single requested byte of a file stream asynchronously
* Streams, on all platforms, yield as soon as there is data available to copy over (usually) * Streams, on all platforms, yield as soon as there is data available to copy over (usually)
*/ */
AuUInt32 pageLengthOrZero {}; AuUInt32 uPageLengthOrZero {};
/** /**
* @brief internal buffer size or zero if fallback * @brief internal buffer size or zero if fallback
*/ */
AuUInt32 bufferLengthOrZero {}; AuUInt32 uBufferLengthOrZero {};
/** /**
* @brief event listener * @brief event listener
*/ */
AuSPtr<IIOPipeEventListener> listener; AuSPtr<IIOPipeEventListener> pListener;
/** /**
* @brief Used as the buffer size for streams of page length 0 * @brief Used as the buffer size for streams of page length 0

View File

@ -13,7 +13,7 @@ namespace Aurora::IO
struct IOPipeRequestAIO : IOPipeRequest struct IOPipeRequestAIO : IOPipeRequest
{ {
AuSPtr<IAsyncTransaction> asyncTransaction; AuSPtr<IAsyncTransaction> pAsyncTransaction;
IOPipeCallback output; IOPipeCallback output;
}; };

View File

@ -15,7 +15,7 @@ namespace Aurora::IO
* @param timeout Timeout in milliseconds, zero = indefinite * @param timeout Timeout in milliseconds, zero = indefinite
* @return true on preemption, false on timeout * @return true on preemption, false on timeout
*/ */
AUKN_SYM bool WaitFor(AuUInt32 milliseconds, bool waitEntireFrame = true); AUKN_SYM bool WaitFor(AuUInt32 uMilliseconds, bool bWaitEntireFrame = true);
/** /**
* Sleeps for the given timeout in milliseconds * Sleeps for the given timeout in milliseconds
@ -24,7 +24,7 @@ namespace Aurora::IO
* @param timeout Timeout in milliseconds, zero = indefinite * @param timeout Timeout in milliseconds, zero = indefinite
* @return True if an IO event occurred * @return True if an IO event occurred
*/ */
AUKN_SYM bool IOSleep(AuUInt32 milliseconds); AUKN_SYM bool IOSleep(AuUInt32 uMilliseconds);
/** /**
* Sleeps up to the given timeout in milliseconds * Sleeps up to the given timeout in milliseconds
@ -33,7 +33,7 @@ namespace Aurora::IO
* @param timeout Timeout in milliseconds, zero = indefinite * @param timeout Timeout in milliseconds, zero = indefinite
* @return True if an IO event occurred * @return True if an IO event occurred
*/ */
AUKN_SYM bool IOYieldFor(AuUInt32 milliseconds); AUKN_SYM bool IOYieldFor(AuUInt32 uMilliseconds);
/** /**
* Nonblocking yield for IO alerts * Nonblocking yield for IO alerts

View File

@ -11,7 +11,8 @@ namespace Aurora::IO
{ {
AUKN_INTERFACE(ISeekingReader, AUKN_INTERFACE(ISeekingReader,
AUI_METHOD(EStreamError, IsOpen, ()), AUI_METHOD(EStreamError, IsOpen, ()),
AUI_METHOD(EStreamError, ArbitraryRead, (AuUInt, offset, const Memory::MemoryViewStreamWrite&, paramters)), AUI_METHOD(EStreamError, ArbitraryRead, (AuUInt, uOffset,
const Memory::MemoryViewStreamWrite&, writeView)),
AUI_METHOD(void, Close, ()) AUI_METHOD(void, Close, ())
); );
} }

View File

@ -13,10 +13,10 @@ namespace Aurora::IO
{ {
struct IOBufferedProcessor : IIOBufferedProcessor struct IOBufferedProcessor : IIOBufferedProcessor
{ {
AuSPtr<IStreamReader> source; AuSPtr<IStreamReader> pSource;
AuSPtr<IStreamWriter> drain; AuSPtr<IStreamWriter> pDrain;
AuSPtr<IIOPipeInterceptor> processor; AuSPtr<IIOPipeInterceptor> pProcessor;
AuUInt32 bufferSize {}; AuUInt32 uBufferSize {};
AuByteBuffer buffer; AuByteBuffer buffer;
AU_DEFINE_FOR_VA(IOBufferedProcessor, AU_DEFINE_FOR_VA(IOBufferedProcessor,
@ -25,7 +25,7 @@ namespace Aurora::IO
AU_DEFINE_EQUALS_VA, // add equals operator AU_DEFINE_EQUALS_VA, // add equals operator
AU_DEFINE_MOVE_VA, // add move assignment operator AU_DEFINE_MOVE_VA, // add move assignment operator
AU_DEFINE_COPY_VA), // add copy assignment operator AU_DEFINE_COPY_VA), // add copy assignment operator
(source, drain, processor, bufferSize)); (pSource, pDrain, pProcessor, uBufferSize));
AuUInt32 TryProcessBuffered() override; AuUInt32 TryProcessBuffered() override;
@ -39,7 +39,7 @@ namespace Aurora::IO
{ {
if (this->buffer.IsEmpty()) if (this->buffer.IsEmpty())
{ {
this->buffer.Allocate(this->bufferSize); this->buffer.Allocate(this->uBufferSize);
this->buffer.flagCircular = true; // !!! this->buffer.flagCircular = true; // !!!
} }
@ -54,7 +54,7 @@ namespace Aurora::IO
AuUInt read {}; AuUInt read {};
try try
{ {
if (this->source->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer.writePtr, canBuffer), read)) != if (this->pSource->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer.writePtr, canBuffer), read)) !=
AuIO::EStreamError::eErrorNone) AuIO::EStreamError::eErrorNone)
{ {
return TryPump(); return TryPump();
@ -82,12 +82,17 @@ namespace Aurora::IO
do do
{ {
if (this->buffer.flagCircular && this->buffer.readPtr == this->buffer.base + this->buffer.length)
{
this->buffer.readPtr = this->buffer.base;
}
AuUInt canRead = this->buffer.RemainingBytes(); AuUInt canRead = this->buffer.RemainingBytes();
canRead = AuMin<AuUInt>(canRead, (this->buffer.length + this->buffer.base) - this->buffer.readPtr); canRead = AuMin<AuUInt>(canRead, (this->buffer.length + this->buffer.base) - this->buffer.readPtr);
try try
{ {
if (!this->processor->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer.readPtr, canRead), bytesProcessed), this->drain)) if (!this->pProcessor->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer.readPtr, canRead), bytesProcessed), this->pDrain))
{ {
break; break;
} }
@ -100,7 +105,7 @@ namespace Aurora::IO
this->buffer.readPtr += bytesProcessed; this->buffer.readPtr += bytesProcessed;
bytesProcessedTotal += bytesProcessed; bytesProcessedTotal += bytesProcessed;
if (this->buffer.readPtr == this->buffer.base + this->buffer.length) if (this->buffer.flagCircular && this->buffer.readPtr == this->buffer.base + this->buffer.length)
{ {
this->buffer.readPtr = this->buffer.base; this->buffer.readPtr = this->buffer.base;
} }
@ -117,14 +122,14 @@ namespace Aurora::IO
AuUInt32 IOBufferedProcessor::GetRawBytesLimit() AuUInt32 IOBufferedProcessor::GetRawBytesLimit()
{ {
return this->bufferSize; return this->uBufferSize;
} }
AUKN_SYM AuSPtr<IIOBufferedProcessor> NewBufferedProcessor(const AuSPtr<IStreamReader> &source, AUKN_SYM AuSPtr<IIOBufferedProcessor> NewBufferedProcessor(const AuSPtr<IStreamReader> &pSource,
const AuSPtr<IIOPipeInterceptor> &processor, const AuSPtr<IIOPipeInterceptor> &pProcessor,
const AuSPtr<IStreamWriter> &drain, const AuSPtr<IStreamWriter> &pDrain,
AuUInt32 bufferSize) AuUInt32 uBufferSize)
{ {
return AuMakeShared<IOBufferedProcessor>(source, drain, processor, bufferSize); return AuMakeShared<IOBufferedProcessor>(pSource, pDrain, pProcessor, uBufferSize);
} }
} }

View File

@ -19,13 +19,13 @@ namespace Aurora::IO
endCallback(this), endCallback(this),
output(request.output) output(request.output)
{ {
this->frameCap_ = request.pageLengthOrZero ? request.pageLengthOrZero : request.kFallbackPageSize; this->uFrameCap_ = request.uPageLengthOrZero ? request.uPageLengthOrZero : request.kFallbackPageSize;
this->bufferSize_ = request.bufferLengthOrZero ? request.bufferLengthOrZero : request.kFallbackBufferSize; this->uBufferSize_ = request.uBufferLengthOrZero ? request.uBufferLengthOrZero : request.kFallbackBufferSize;
this->bytesWrittenLimit_ = request.lengthOrZero; this->uBytesWrittenLimit_ = request.uLengthOrZero;
this->asyncTransaction_ = request.asyncTransaction; this->pAsyncTransaction_ = request.pAsyncTransaction;
this->asyncAdapter_ = NewAsyncStreamAdapter(request.asyncTransaction, request.isStream); this->pAsyncAdapter_ = NewAsyncStreamAdapter(request.pAsyncTransaction, request.bIsStream);
SysAssert(this->asyncAdapter_); SysAssert(this->pAsyncAdapter_);
this->asyncStreamReader_ = this->asyncAdapter_->ToStreamReader(); this->pAsyncStreamReader_ = this->pAsyncAdapter_->ToStreamReader();
} }
IOPipeWork::IOPipeWork(const AuSPtr<IOPipeProcessor> &parent, const IOPipeRequestBasic &request) : IOPipeWork::IOPipeWork(const AuSPtr<IOPipeProcessor> &parent, const IOPipeRequestBasic &request) :
@ -35,9 +35,9 @@ namespace Aurora::IO
endCallback(this), endCallback(this),
output(request.output) output(request.output)
{ {
this->bufferSize_ = request.bufferLengthOrZero ? request.bufferLengthOrZero : request.kFallbackBufferSize; this->uBufferSize_ = request.uBufferLengthOrZero ? request.uBufferLengthOrZero : request.kFallbackBufferSize;
this->frameCap_ = request.pageLengthOrZero ? request.pageLengthOrZero : request.kFallbackPageSize; this->uFrameCap_ = request.uPageLengthOrZero ? request.uPageLengthOrZero : request.kFallbackPageSize;
this->bytesWrittenLimit_ = request.lengthOrZero; this->uBytesWrittenLimit_ = request.uLengthOrZero;
} }
void IOPipeWork::Tick_FrameEpilogue() void IOPipeWork::Tick_FrameEpilogue()
@ -50,7 +50,7 @@ namespace Aurora::IO
void IOPipeWork::Tick_Any() void IOPipeWork::Tick_Any()
{ {
if (this->asyncTransaction_) if (this->pAsyncTransaction_)
{ {
this->AsyncPump(); this->AsyncPump();
} }
@ -110,9 +110,9 @@ namespace Aurora::IO
{ {
AuSPtr<IIOProcessorItem> ret; AuSPtr<IIOProcessorItem> ret;
if (this->asyncTransaction_) if (this->pAsyncTransaction_)
{ {
auto pWaitable = this->asyncAdapter_->ToWaitable(); auto pWaitable = this->pAsyncAdapter_->ToWaitable();
if (pWaitable) if (pWaitable)
{ {
ret = this->parent_->parent_->StartIOWatch(pWaitable, AuSharedFromThis()); ret = this->parent_->parent_->StartIOWatch(pWaitable, AuSharedFromThis());
@ -124,7 +124,7 @@ namespace Aurora::IO
} }
else else
{ {
ret = this->parent_->parent_->StartIOWatch(this->input_.watchItem, AuSharedFromThis()); ret = this->parent_->parent_->StartIOWatch(this->input_.pWatchItem, AuSharedFromThis());
if (!ret) if (!ret)
{ {
@ -132,7 +132,7 @@ namespace Aurora::IO
} }
} }
this->watch = ret; this->pWatch = ret;
if (this->parent_->parent_->CheckThread()) if (this->parent_->parent_->CheckThread())
{ {
@ -165,7 +165,7 @@ namespace Aurora::IO
return; return;
} }
if (!this->buffer_.Allocate(this->bufferSize_, AuHwInfo::GetPageSize(), true)) if (!this->buffer_.Allocate(this->uBufferSize_, AuHwInfo::GetPageSize(), true))
{ {
SysPushErrorMem(); SysPushErrorMem();
TerminateOnThread(true); TerminateOnThread(true);
@ -185,7 +185,7 @@ namespace Aurora::IO
{ {
AuMemoryViewWrite internalBuffer; AuMemoryViewWrite internalBuffer;
auto err = this->asyncStreamReader_->Dequeue(0, internalBuffer); auto err = this->pAsyncStreamReader_->Dequeue(0, internalBuffer);
if (err != EStreamError::eErrorNone) if (err != EStreamError::eErrorNone)
{ {
SysPushErrorIO("Async Stream Error: {}", err); SysPushErrorIO("Async Stream Error: {}", err);
@ -200,7 +200,7 @@ namespace Aurora::IO
return; return;
} }
err = this->asyncStreamReader_->Dequeue(internalBuffer.length, internalBuffer); err = this->pAsyncStreamReader_->Dequeue(internalBuffer.length, internalBuffer);
if (err != EStreamError::eErrorNone) if (err != EStreamError::eErrorNone)
{ {
SysPushErrorIO("Async Stream Error: {}", err); SysPushErrorIO("Async Stream Error: {}", err);
@ -221,7 +221,7 @@ namespace Aurora::IO
AuUInt read {}; AuUInt read {};
try try
{ {
if (this->input_.reader->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer_.writePtr, canBuffer), read)) != if (this->input_.pReader->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer_.writePtr, canBuffer), read)) !=
AuIO::EStreamError::eErrorNone) AuIO::EStreamError::eErrorNone)
{ {
TerminateOnThread(); TerminateOnThread();
@ -248,11 +248,11 @@ namespace Aurora::IO
{ {
AuUInt canBuffer = this->buffer_.RemainingWrite(); AuUInt canBuffer = this->buffer_.RemainingWrite();
canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr)); canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr));
canBuffer = AuMin(AuUInt(this->frameCap_), canBuffer); canBuffer = AuMin(AuUInt(this->uFrameCap_), canBuffer);
this->nextWriteAsync_ = AuMemoryViewWrite(this->buffer_.writePtr, canBuffer); this->nextWriteAsync_ = AuMemoryViewWrite(this->buffer_.writePtr, canBuffer);
if (this->asyncStreamReader_->BeginRead(AuSPtr<AuMemoryViewWrite>(this->SharedFromThis(), &this->nextWriteAsync_)) != if (this->pAsyncStreamReader_->BeginRead(AuSPtr<AuMemoryViewWrite>(this->SharedFromThis(), &this->nextWriteAsync_)) !=
AuIO::EStreamError::eErrorNone) AuIO::EStreamError::eErrorNone)
{ {
TerminateOnThread(true); TerminateOnThread(true);
@ -284,15 +284,15 @@ namespace Aurora::IO
return; return;
} }
if (this->asyncTransaction_) if (this->pAsyncTransaction_)
{ {
ReadNextAsync(); ReadNextAsync();
} }
else else
{ {
if (this->input_.backend) if (this->input_.pBackend)
{ {
this->input_.backend->OnEndPump(); this->input_.pBackend->OnEndPump();
} }
} }
} }
@ -323,7 +323,7 @@ namespace Aurora::IO
{ {
if (this->output.type == EPipeCallbackType::eTryHandleBufferedPart) if (this->output.type == EPipeCallbackType::eTryHandleBufferedPart)
{ {
if (!this->output.handleBufferedStream.onData->OnDataAvailable(this->buffer_)) if (!this->output.handleBufferedStream.pOnData->OnDataAvailable(this->buffer_))
{ {
bytesProcessed = 0; bytesProcessed = 0;
this->buffer_.readPtr = this->buffer_.base + readHead; this->buffer_.readPtr = this->buffer_.base + readHead;
@ -335,7 +335,7 @@ namespace Aurora::IO
} }
else else
{ {
if (!this->output.forwardStream.intercepter->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer_.readPtr, canRead), bytesProcessed), this->output.forwardStream.writer)) if (!this->output.forwardStream.pIntercepter->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer_.readPtr, canRead), bytesProcessed), this->output.forwardStream.pWriter))
{ {
bytesProcessed = 0; bytesProcessed = 0;
} }
@ -390,7 +390,7 @@ namespace Aurora::IO
{ {
if (this->output.forwardStream.bFlushWriter) if (this->output.forwardStream.bFlushWriter)
{ {
this->output.forwardStream.writer->Flush(); this->output.forwardStream.pWriter->Flush();
} }
} }
} }
@ -401,11 +401,11 @@ namespace Aurora::IO
this->bShouldReadNext = true; this->bShouldReadNext = true;
} }
this->bytesWritten_ += bytesProcessedTotal; this->uBytesWritten_ += bytesProcessedTotal;
if (this->request.listener) if (this->request.pListener)
{ {
this->request.listener->OnPipePartialEvent(bytesProcessedTotal); this->request.pListener->OnPipePartialEvent(bytesProcessedTotal);
} }
return bytesProcessedTotal; return bytesProcessedTotal;
@ -413,7 +413,7 @@ namespace Aurora::IO
bool IOPipeWork::IsAtRequestedEnd() bool IOPipeWork::IsAtRequestedEnd()
{ {
return this->bytesWrittenLimit_ && (this->bytesWrittenLimit_ <= this->bytesWritten_); return this->uBytesWrittenLimit_ && (this->uBytesWrittenLimit_ <= this->uBytesWritten_);
} }
AuByteBuffer *IOPipeWork::GetBuffer() AuByteBuffer *IOPipeWork::GetBuffer()
@ -423,12 +423,12 @@ namespace Aurora::IO
void IOPipeWork::RunOnThread() void IOPipeWork::RunOnThread()
{ {
if (this->input_.backend) if (this->input_.pBackend)
{ {
this->input_.backend->OnStart(); this->input_.pBackend->OnStart();
} }
if (this->asyncTransaction_) if (this->pAsyncTransaction_)
{ {
PrepareAsync(); PrepareAsync();
} }
@ -447,50 +447,50 @@ namespace Aurora::IO
this->bActive = false; this->bActive = false;
if (this->watch) if (this->pWatch)
{ {
watch->StopWatch(); this->pWatch->StopWatch();
} }
if (this->request.listener) if (this->request.pListener)
{ {
if (error) if (error)
{ {
// We explicitly failed... // We explicitly failed...
this->request.listener->OnPipeFailureEvent(); this->request.pListener->OnPipeFailureEvent();
} }
else if (this->bytesWrittenLimit_ && (this->bytesWrittenLimit_ > this->bytesWritten_)) else if (this->uBytesWrittenLimit_ && (this->uBytesWrittenLimit_ > this->uBytesWritten_))
{ {
// Finished without error early // Finished without error early
this->request.listener->OnPipeFailureEvent(); this->request.pListener->OnPipeFailureEvent();
} }
else else
{ {
// We finished... // We finished...
this->request.listener->OnPipeSuccessEvent(); this->request.pListener->OnPipeSuccessEvent();
} }
this->request.listener.reset(); this->request.pListener.reset();
} }
this->output.handleBufferedStream.onData.reset(); this->output.handleBufferedStream.pOnData.reset();
this->output.forwardStream.intercepter.reset(); this->output.forwardStream.pIntercepter.reset();
this->output.forwardStream.writer.reset(); this->output.forwardStream.pWriter.reset();
if (this->input_.backend) if (this->input_.pBackend)
{ {
this->input_.backend->OnEnd(); this->input_.pBackend->OnEnd();
this->input_.backend.reset(); this->input_.pBackend.reset();
} }
if (auto transaction = this->asyncTransaction_) if (auto transaction = this->pAsyncTransaction_)
{ {
transaction->Reset(); transaction->Reset();
this->asyncTransaction_.reset(); this->pAsyncTransaction_.reset();
} }
this->asyncAdapter_.reset(); this->pAsyncAdapter_.reset();
this->asyncStreamReader_.reset(); this->pAsyncStreamReader_.reset();
} }
IOPipeProcessor::IOPipeProcessor(IOProcessor *parent) : IOPipeProcessor::IOPipeProcessor(IOProcessor *parent) :

View File

@ -35,10 +35,10 @@ namespace Aurora::IO
struct IOPipeWork : IIOPipeWork, IIOEventListenerFunctional, AuEnableSharedFromThis<IIOPipeWork> struct IOPipeWork : IIOPipeWork, IIOEventListenerFunctional, AuEnableSharedFromThis<IIOPipeWork>
{ {
IOPipeWork(const AuSPtr<IOPipeProcessor> &parent, const IOPipeRequestAIO &request); IOPipeWork(const AuSPtr<IOPipeProcessor> &pParent, const IOPipeRequestAIO &request);
IOPipeWork(const AuSPtr<IOPipeProcessor> &parent, const IOPipeRequestBasic &request); IOPipeWork(const AuSPtr<IOPipeProcessor> &pParent, const IOPipeRequestBasic &request);
AuSPtr<IIOProcessorItem> watch; AuSPtr<IIOProcessorItem> pWatch;
void Tick_FrameEpilogue() override; void Tick_FrameEpilogue() override;
void Tick_Any() override; void Tick_Any() override;
@ -51,7 +51,7 @@ namespace Aurora::IO
virtual bool End() override; virtual bool End() override;
void RunOnThread(); void RunOnThread();
void TerminateOnThread(bool error = false); void TerminateOnThread(bool bError = false);
// INIT // INIT
void PrepareStream(); void PrepareStream();
@ -80,19 +80,19 @@ namespace Aurora::IO
struct /*not a union. the following members are mutex*/ struct /*not a union. the following members are mutex*/
{ {
IOPipeInputData input_; IOPipeInputData input_;
AuSPtr<IAsyncTransaction> asyncTransaction_; AuSPtr<IAsyncTransaction> pAsyncTransaction_;
AuSPtr<IAsyncStreamAdapter> asyncAdapter_; AuSPtr<IAsyncStreamAdapter> pAsyncAdapter_;
AuSPtr<IAsyncStreamReader> asyncStreamReader_; AuSPtr<IAsyncStreamReader> pAsyncStreamReader_;
}; };
IOPipeCallback output; IOPipeCallback output;
IOWorkStart startCallback; IOWorkStart startCallback;
IOWorkEnd endCallback; IOWorkEnd endCallback;
bool bActive {true}; bool bActive {true};
AuUInt32 bufferSize_ {}; AuUInt32 uBufferSize_ {};
AuUInt32 frameCap_ {}; AuUInt32 uFrameCap_ {};
AuUInt bytesWritten_ {}; AuUInt uBytesWritten_ {};
AuUInt bytesWrittenLimit_ {}; AuUInt uBytesWrittenLimit_ {};
AuByteBuffer buffer_; AuByteBuffer buffer_;
}; };

View File

@ -16,11 +16,12 @@
namespace Aurora::IO namespace Aurora::IO
{ {
IOProcessor::IOProcessor(AuUInt threadId, bool tickOnly, IOProcessor::IOProcessor(AuUInt threadId,
bool bTickOnly,
AuAsync::WorkerPId_t worker, AuAsync::WorkerPId_t worker,
const AuSPtr<AuLoop::ILoopQueue> &loop) : const AuSPtr<AuLoop::ILoopQueue> &pLoopQueue) :
mutliplexIOAndTimer(!tickOnly), mutliplexIOAndTimer(!bTickOnly),
loopQueue(loop), pLoopQueue(pLoopQueue),
asyncWorker(worker), asyncWorker(worker),
threadId(threadId), threadId(threadId),
streamProcessors(this) streamProcessors(this)
@ -101,7 +102,7 @@ namespace Aurora::IO
return; return;
} }
AuStaticCast<Loop::LoopQueue>(this->loopQueue)->AddHook([that = AuSharedFromThis()]() AuStaticCast<Loop::LoopQueue>(this->pLoopQueue)->AddHook([that = AuSharedFromThis()]()
{ {
that->RunTick(); that->RunTick();
}); });
@ -204,9 +205,9 @@ namespace Aurora::IO
for (auto &item : this->items.allItems) for (auto &item : this->items.allItems)
{ {
if (item->item->IsRunOnSelfIO() && item->item->IsRunOnSelfIOCheckedOnTimerTick()) if (item->pItem->IsRunOnSelfIO() && item->pItem->IsRunOnSelfIOCheckedOnTimerTick())
{ {
auto ls = item->item->GetSelfIOSource(); auto ls = item->pItem->GetSelfIOSource();
if (ls && ls->IsSignaled()) if (ls && ls->IsSignaled())
{ {
// Should deadlock without critical sections // Should deadlock without critical sections
@ -220,16 +221,16 @@ namespace Aurora::IO
{ {
for (auto &a : this->items.onTickReceivers) for (auto &a : this->items.onTickReceivers)
{ {
if (a->listener) if (a->pListener)
{ {
try try
{ {
a->listener->Tick_RunOnTick(); a->pListener->Tick_RunOnTick();
if (!AuExists(this->items.workSignaled, a) && if (!AuExists(this->items.workSignaled, a) &&
!AuExists(this->items.onTickReceivers, a)) !AuExists(this->items.onTickReceivers, a))
{ {
a->listener->Tick_Any(); a->pListener->Tick_Any();
} }
} }
catch (...) catch (...)
@ -243,12 +244,12 @@ namespace Aurora::IO
for (auto &a : this->items.workSignaled) for (auto &a : this->items.workSignaled)
{ {
if (a->listener) if (a->pListener)
{ {
try try
{ {
a->listener->Tick_SelfIOEvent(); a->pListener->Tick_SelfIOEvent();
a->listener->Tick_Any(); a->pListener->Tick_Any();
} }
catch (...) catch (...)
{ {
@ -274,12 +275,12 @@ namespace Aurora::IO
continue; continue;
} }
if (a->listener) if (a->pListener)
{ {
try try
{ {
a->listener->Tick_OtherIOEvent(); a->pListener->Tick_OtherIOEvent();
a->listener->Tick_Any(); a->pListener->Tick_Any();
} }
catch (...) catch (...)
{ {
@ -299,14 +300,14 @@ namespace Aurora::IO
for (auto &pumped : this->items.finalizeQueue) for (auto &pumped : this->items.finalizeQueue)
{ {
if (!pumped->listener) if (!pumped->pListener)
{ {
continue; continue;
} }
try try
{ {
pumped->listener->Tick_FrameEpilogue(); pumped->pListener->Tick_FrameEpilogue();
} }
catch (...) catch (...)
{ {
@ -344,15 +345,15 @@ namespace Aurora::IO
try try
{ {
if (processor->listener) if (processor->pListener)
{ {
if (fatal) if (fatal)
{ {
processor->listener->OnFailureCompletion(); processor->pListener->OnFailureCompletion();
} }
else else
{ {
processor->listener->OnNominalCompletion(); processor->pListener->OnNominalCompletion();
} }
} }
} }
@ -361,7 +362,7 @@ namespace Aurora::IO
SysPushErrorCatch(); SysPushErrorCatch();
} }
auto item = processor->item; auto item = processor->pItem;
if (item->IsRunOnTick()) if (item->IsRunOnTick())
{ {
@ -452,7 +453,7 @@ namespace Aurora::IO
} }
else else
{ {
return this->loopQueue->WaitAny(msMax); return this->pLoopQueue->WaitAny(msMax);
} }
} }
@ -558,7 +559,7 @@ namespace Aurora::IO
void IOProcessor::UpdateTimers() void IOProcessor::UpdateTimers()
{ {
this->timers.lsTicker->UpdateTickRateIfAnyNs(this->refreshRateNs); this->timers.pLsTicker->UpdateTickRateIfAnyNs(this->refreshRateNs);
this->timers.nbTicker.nsTimeStep = this->refreshRateNs; this->timers.nbTicker.nsTimeStep = this->refreshRateNs;
if (!this->timers.nbTicker.nextTriggerTime) if (!this->timers.nbTicker.nextTriggerTime)
@ -581,21 +582,21 @@ namespace Aurora::IO
void IOProcessor::AddTimerLS() void IOProcessor::AddTimerLS()
{ {
this->ToQueue()->SourceAdd(this->timers.lsTicker); this->ToQueue()->SourceAdd(this->timers.pLsTicker);
this->ToQueue()->AddCallback(this->timers.lsTicker, AuSPtr<AuLoop::ILoopSourceSubscriber>(AuSharedFromThis(), &this->timers)); this->ToQueue()->AddCallback(this->timers.pLsTicker, AuSPtr<AuLoop::ILoopSourceSubscriber>(AuSharedFromThis(), &this->timers));
this->ToQueue()->Commit(); this->ToQueue()->Commit();
} }
void IOProcessor::StartAsyncTimerIfAny() void IOProcessor::StartAsyncTimerIfAny()
{ {
if (!this->workItem) if (!this->pWorkItem)
{ {
this->workItem = this->asyncWorker.pool->NewWorkItem(this->asyncWorker, AuSharedFromThis()); this->pWorkItem = this->asyncWorker.pool->NewWorkItem(this->asyncWorker, AuSharedFromThis());
} }
this->workItem->SetSchedTimeNs(this->refreshRateNs); this->pWorkItem->SetSchedTimeNs(this->refreshRateNs);
this->workItem->Dispatch(); this->pWorkItem->Dispatch();
} }
void IOProcessor::RemoveTimer() void IOProcessor::RemoveTimer()
@ -612,13 +613,13 @@ namespace Aurora::IO
void IOProcessor::CancelWorkItem() void IOProcessor::CancelWorkItem()
{ {
if (!this->workItem) if (!this->pWorkItem)
{ {
return; return;
} }
this->workItem->Cancel(); this->pWorkItem->Cancel();
this->workItem.reset(); this->pWorkItem.reset();
} }
void IOProcessor::RemoveLSTimer() void IOProcessor::RemoveLSTimer()
@ -629,7 +630,7 @@ namespace Aurora::IO
return; return;
} }
queue->SourceRemove(this->timers.lsTicker); queue->SourceRemove(this->timers.pLsTicker);
} }
bool IOProcessor::IsAsync() bool IOProcessor::IsAsync()
@ -643,13 +644,14 @@ namespace Aurora::IO
this->timers.nbTicker.nextTriggerTime; this->timers.nbTicker.nextTriggerTime;
} }
bool IOProcessor::RequestRemovalForItemFromAnyThread(const AuSPtr<IIOProcessorItem> &processor) bool IOProcessor::RequestRemovalForItemFromAnyThread(const AuSPtr<IIOProcessorItem> &pProcessor)
{ {
return {}; return {};
} }
AuSPtr<IIOProcessorItem> IOProcessor::StartIOWatchEx(const AuSPtr<IIOWaitableItem> &pItem,
AuSPtr<IIOProcessorItem> IOProcessor::StartIOWatchEx(const AuSPtr<IIOWaitableItem>& object, const AuSPtr<IIOEventListener>& listener, bool singleshot) const AuSPtr<IIOEventListener> &pListener,
bool bSingleshot)
{ {
if (!CheckThread()) if (!CheckThread())
{ {
@ -657,16 +659,16 @@ namespace Aurora::IO
} }
auto item = AuMakeShared<IOProcessorItem>(); auto item = AuMakeShared<IOProcessorItem>();
item->parent = this; item->pParent = this;
item->listener = listener; item->pListener = pListener;
item->item = object; item->pItem = pItem;
item->singleshot = singleshot; item->bSingleshot = bSingleshot;
AU_LOCK_GUARD(this->items.mutex); AU_LOCK_GUARD(this->items.mutex);
this->items.allItems.push_back(item); this->items.allItems.push_back(item);
if (object->IsRunOnTick()) if (pItem->IsRunOnTick())
{ {
if (!AuTryInsert(this->items.onTickReceivers, item)) if (!AuTryInsert(this->items.onTickReceivers, item))
{ {
@ -675,7 +677,7 @@ namespace Aurora::IO
} }
} }
if (object->IsRunOnOtherTick()) if (pItem->IsRunOnOtherTick())
{ {
if (!AuTryInsert(this->items.onOtherReceivers, item)) if (!AuTryInsert(this->items.onOtherReceivers, item))
{ {
@ -684,16 +686,16 @@ namespace Aurora::IO
} }
} }
if (object->IsRunOnSelfIO()) if (pItem->IsRunOnSelfIO())
{ {
auto src = object->GetSelfIOSource(); auto src = pItem->GetSelfIOSource();
if (!src) if (!src)
{ {
SysPushErrorGeneric(); SysPushErrorGeneric();
return {}; return {};
} }
auto timeout = object->IOTimeoutInMS(); auto timeout = pItem->IOTimeoutInMS();
if (timeout) if (timeout)
{ {
if (!this->ToQueue()->SourceAddWithTimeout(src, timeout)) if (!this->ToQueue()->SourceAddWithTimeout(src, timeout))
@ -790,7 +792,7 @@ namespace Aurora::IO
AuSPtr<AuLoop::ILoopQueue> IOProcessor::ToQueue() AuSPtr<AuLoop::ILoopQueue> IOProcessor::ToQueue()
{ {
return this->loopQueue; return this->pLoopQueue;
} }
void IOProcessor::ReleaseAllWatches() void IOProcessor::ReleaseAllWatches()
@ -806,7 +808,7 @@ namespace Aurora::IO
for (auto &io : this->items.registeredIO) for (auto &io : this->items.registeredIO)
{ {
queue->SourceRemove(io->item->GetSelfIOSource()); queue->SourceRemove(io->pItem->GetSelfIOSource());
} }
queue->Commit(); queue->Commit();

View File

@ -85,7 +85,7 @@ namespace Aurora::IO
bool RequestRemovalForItemFromAnyThread(const AuSPtr<IIOProcessorItem> &processor); bool RequestRemovalForItemFromAnyThread(const AuSPtr<IIOProcessorItem> &processor);
AuSPtr<AuLoop::ILoopQueue> loopQueue; AuSPtr<AuLoop::ILoopQueue> pLoopQueue;
AuUInt threadId; AuUInt threadId;
@ -110,7 +110,7 @@ namespace Aurora::IO
IOProcessorItems items; IOProcessorItems items;
IOProcessorTimers timers; IOProcessorTimers timers;
AuAsync::WorkerPId_t asyncWorker; AuAsync::WorkerPId_t asyncWorker;
AuSPtr<AuAsync::IWorkItem> workItem; AuSPtr<AuAsync::IWorkItem> pWorkItem;
AuThreadPrimitives::MutexUnique_t mutex; AuThreadPrimitives::MutexUnique_t mutex;

View File

@ -14,34 +14,34 @@ namespace Aurora::IO
{ {
bool IOProcessorItem::StopWatch() bool IOProcessorItem::StopWatch()
{ {
if (this->parent->CheckThread() && if (this->pParent->CheckThread() &&
this->parent->bFrameStart) this->pParent->bFrameStart)
{ {
this->parent->ClearProcessor(AuSharedFromThis(), false); this->pParent->ClearProcessor(AuSharedFromThis(), false);
return true; return true;
} }
return this->parent->items.ScheduleFinish(AuSharedFromThis(), false); return this->pParent->items.ScheduleFinish(AuSharedFromThis(), false);
} }
bool IOProcessorItem::FailWatch() bool IOProcessorItem::FailWatch()
{ {
if (this->parent->CheckThread() && if (this->pParent->CheckThread() &&
this->parent->bFrameStart) this->pParent->bFrameStart)
{ {
this->parent->ClearProcessor(AuSharedFromThis(), true); this->pParent->ClearProcessor(AuSharedFromThis(), true);
return false; return false;
} }
return this->parent->items.ScheduleFinish(AuSharedFromThis(), true); return this->pParent->items.ScheduleFinish(AuSharedFromThis(), true);
} }
bool IOProcessorItem::OnFinished(const AuSPtr<AuLoop::ILoopSource>& source, AuUInt8 pos) bool IOProcessorItem::OnFinished(const AuSPtr<AuLoop::ILoopSource>& source, AuUInt8 pos)
{ {
if (this->singleshot) if (this->bSingleshot)
{ {
if (AuExchange(this->triggered, true)) if (AuExchange(this->bTriggered, true))
{ {
StopWatch(); StopWatch();
return false; return false;
@ -49,7 +49,7 @@ namespace Aurora::IO
} }
IOAlert(false); IOAlert(false);
return this->singleshot; return this->bSingleshot;
} }
void IOProcessorItem::OnTimeout(const AuSPtr<AuLoop::ILoopSource>& source) void IOProcessorItem::OnTimeout(const AuSPtr<AuLoop::ILoopSource>& source)
@ -65,31 +65,31 @@ namespace Aurora::IO
void IOProcessorItem::IOAlert(bool force) void IOProcessorItem::IOAlert(bool force)
{ {
if (!this->parent->IsTickOnly()) if (!this->pParent->IsTickOnly())
{ {
if (!this->parent->bFrameStart) if (!this->pParent->bFrameStart)
{ {
if (force) if (force)
{ {
this->parent->TickFor(AuSharedFromThis()); this->pParent->TickFor(AuSharedFromThis());
} }
else else
{ {
this->parent->TickForHack(AuSharedFromThis()); this->pParent->TickForHack(AuSharedFromThis());
} }
} }
else else
{ {
this->parent->TickForRegister(AuSharedFromThis()); this->pParent->TickForRegister(AuSharedFromThis());
} }
} }
else if (force) // manual tick else if (force) // manual tick
{ {
if (this->parent->CheckThread()) if (this->pParent->CheckThread())
{ {
this->parent->TickFor(AuSharedFromThis()); this->pParent->TickFor(AuSharedFromThis());
} }
else if (parent->IsAsync()) else if (this->pParent->IsAsync())
{ {
// TODO: // TODO:
} }
@ -99,8 +99,7 @@ namespace Aurora::IO
} }
} }
if (this->bSingleshot)
if (this->singleshot)
{ {
StopWatch(); StopWatch();
} }

View File

@ -13,12 +13,12 @@ namespace Aurora::IO
struct IOProcessorItem : AuLoop::ILoopSourceSubscriberEx, IIOProcessorItem, AuEnableSharedFromThis<IOProcessorItem> struct IOProcessorItem : AuLoop::ILoopSourceSubscriberEx, IIOProcessorItem, AuEnableSharedFromThis<IOProcessorItem>
{ {
IOProcessor *parent; IOProcessor *pParent;
bool singleshot{}; bool bSingleshot{};
bool triggered{}; bool bTriggered{};
AuSPtr<IIOWaitableItem> item; AuSPtr<IIOWaitableItem> pItem;
AuSPtr<IIOEventListener> listener; AuSPtr<IIOEventListener> pListener;
// ILoopSourceSubscriber // ILoopSourceSubscriber
bool OnFinished(const AuSPtr<AuLoop::ILoopSource> &source, AuUInt8 pos) override; bool OnFinished(const AuSPtr<AuLoop::ILoopSource> &source, AuUInt8 pos) override;
@ -34,5 +34,4 @@ namespace Aurora::IO
// //
void IOAlert(bool force); void IOAlert(bool force);
}; };
} }

View File

@ -12,16 +12,16 @@
namespace Aurora::IO namespace Aurora::IO
{ {
bool IOProcessorTimers::Init(IOProcessor *parent, AuSPtr<AuLoop::ITimer> lsTicker) bool IOProcessorTimers::Init(IOProcessor *pParent, AuSPtr<AuLoop::ITimer> pLsTicker)
{ {
this->parent = parent; this->pParent = pParent;
this->lsTicker = lsTicker; this->pLsTicker = pLsTicker;
return bool(this->parent) && bool(lsTicker); return bool(this->pParent) && bool(pLsTicker);
} }
bool IOProcessorTimers::OnFinished(const AuSPtr<AuLoop::ILoopSource> &source) bool IOProcessorTimers::OnFinished(const AuSPtr<AuLoop::ILoopSource> &pSource)
{ {
this->parent->ManualTick(); this->pParent->ManualTick();
return false; return false;
} }
} }

View File

@ -13,8 +13,8 @@ namespace Aurora::IO
struct IOProcessorTimers : AuLoop::ILoopSourceSubscriber struct IOProcessorTimers : AuLoop::ILoopSourceSubscriber
{ {
IOProcessor *parent {}; IOProcessor *pParent {};
AuSPtr<AuLoop::ITimer> lsTicker; AuSPtr<AuLoop::ITimer> pLsTicker;
Utility::RateLimiter nbTicker; Utility::RateLimiter nbTicker;
bool Init(IOProcessor *parent, AuSPtr<AuLoop::ITimer> lsTicker); bool Init(IOProcessor *parent, AuSPtr<AuLoop::ITimer> lsTicker);

View File

@ -11,7 +11,7 @@
namespace Aurora::IO namespace Aurora::IO
{ {
IOSimpleEventListener::IOSimpleEventListener(const AuSPtr<IIOSimpleEventListener> &parent) : parent(parent) IOSimpleEventListener::IOSimpleEventListener(const AuSPtr<IIOSimpleEventListener> &pParent) : pParent(pParent)
{ {
} }
@ -33,7 +33,7 @@ namespace Aurora::IO
void IOSimpleEventListener::Tick_Any() void IOSimpleEventListener::Tick_Any()
{ {
this->parent->OnIOTick(); this->pParent->OnIOTick();
} }
void IOSimpleEventListener::Tick_FrameEpilogue() void IOSimpleEventListener::Tick_FrameEpilogue()
@ -43,21 +43,21 @@ namespace Aurora::IO
void IOSimpleEventListener::OnFailureCompletion() void IOSimpleEventListener::OnFailureCompletion()
{ {
this->parent->OnIOFailure(); this->pParent->OnIOFailure();
} }
void IOSimpleEventListener::OnNominalCompletion() void IOSimpleEventListener::OnNominalCompletion()
{ {
this->parent->OnIOComplete(); this->pParent->OnIOComplete();
} }
AuSPtr<IIOEventListener> DesimplifyIOEventListenerAdapter(const AuSPtr<IIOSimpleEventListener> &interface) AuSPtr<IIOEventListener> DesimplifyIOEventListenerAdapter(const AuSPtr<IIOSimpleEventListener> &pInterface)
{ {
if (!interface) if (!pInterface)
{ {
return {}; return {};
} }
return AuMakeShared<IOSimpleEventListener>(interface); return AuMakeShared<IOSimpleEventListener>(pInterface);
} }
} }

View File

@ -11,7 +11,7 @@ namespace Aurora::IO
{ {
struct IOSimpleEventListener : IIOEventListener struct IOSimpleEventListener : IIOEventListener
{ {
IOSimpleEventListener(const AuSPtr<IIOSimpleEventListener> &parent); IOSimpleEventListener(const AuSPtr<IIOSimpleEventListener> &pParent);
void Tick_RunOnTick() override; void Tick_RunOnTick() override;
void Tick_OtherIOEvent() override; void Tick_OtherIOEvent() override;
@ -21,6 +21,6 @@ namespace Aurora::IO
void OnFailureCompletion() override; void OnFailureCompletion() override;
void OnNominalCompletion() override; void OnNominalCompletion() override;
AuSPtr<IIOSimpleEventListener> parent; AuSPtr<IIOSimpleEventListener> pParent;
}; };
} }

View File

@ -77,12 +77,12 @@ namespace Aurora::IO::Net
auto sharedThis = AuSPtr<SocketChannelInput>(this->pParent_->SharedFromThis(), this); auto sharedThis = AuSPtr<SocketChannelInput>(this->pParent_->SharedFromThis(), this);
AuIO::IOPipeRequestAIO req; AuIO::IOPipeRequestAIO req;
req.output.handleBufferedStream.onData = AuUnsafeRaiiToShared(this); req.output.handleBufferedStream.pOnData = AuUnsafeRaiiToShared(this);
req.output.type = EPipeCallbackType::eTryHandleBufferedPart; req.output.type = EPipeCallbackType::eTryHandleBufferedPart;
req.asyncTransaction = this->pNetReadTransaction; req.pAsyncTransaction = this->pNetReadTransaction;
req.isStream = true; req.bIsStream = true;
req.listener = sharedThis; req.pListener = sharedThis;
req.bufferLengthOrZero = AuStaticCast<SocketChannel>(this->pParent_->ToChannel())->uBytesInputBuffer; req.uBufferLengthOrZero = AuStaticCast<SocketChannel>(this->pParent_->ToChannel())->uBytesInputBuffer;
this->pNetReader = this->pParent_->ToWorker()->ToProcessor()->ToPipeProcessor()->NewAIOPipe(req); this->pNetReader = this->pParent_->ToWorker()->ToProcessor()->ToPipeProcessor()->NewAIOPipe(req);
} }
@ -99,6 +99,7 @@ namespace Aurora::IO::Net
this->pParent_->SendErrorNoStream({}); this->pParent_->SendErrorNoStream({});
return; return;
} }
this->pNetReadTransaction->SetCallback(sharedThis);//; AuSPtr<IAsyncFinishedSubscriber>(this->pParent_->SharedFromThis(), this)); this->pNetReadTransaction->SetCallback(sharedThis);//; AuSPtr<IAsyncFinishedSubscriber>(this->pParent_->SharedFromThis(), this));
IncrementWorker(); IncrementWorker();
} }