From 401cf0996267da87b7d4d98c46a7a15ceeb11013 Mon Sep 17 00:00:00 2001 From: Reece Wilson Date: Mon, 29 Aug 2022 16:46:46 +0100 Subject: [PATCH] [*] Cleanup/refactor of AuIO --- .../Aurora/IO/IAsyncFinishedSubscriber.hpp | 2 +- Include/Aurora/IO/IAsyncTransaction.hpp | 8 +- Include/Aurora/IO/IByteBufferStreamPair.hpp | 4 +- Include/Aurora/IO/IIOBufferedProcessor.hpp | 17 ++- Include/Aurora/IO/IIOPipeEventListener.hpp | 2 +- Include/Aurora/IO/IIOProcessor.hpp | 26 ++++- .../Aurora/IO/IIOProcessorManualInvoker.hpp | 3 + Include/Aurora/IO/IIOSimpleEventListener.hpp | 5 +- Include/Aurora/IO/IIOWaitableIOLoopSource.hpp | 19 +++- Include/Aurora/IO/IIOWaitableItem.hpp | 2 - Include/Aurora/IO/IOAdapterAsyncStream.hpp | 2 +- Include/Aurora/IO/IOAdapterByteBuffer.hpp | 6 +- Include/Aurora/IO/IOAdapterCompression.hpp | 4 +- Include/Aurora/IO/IOAdapterSeeking.hpp | 2 +- Include/Aurora/IO/IOPipeCallback.hpp | 6 +- Include/Aurora/IO/IOPipeInputData.hpp | 6 +- Include/Aurora/IO/IOPipeRequest.hpp | 10 +- Include/Aurora/IO/IOPipeRequestAIO.hpp | 2 +- Include/Aurora/IO/IOSleep.hpp | 6 +- Include/Aurora/IO/ISeekingReader.hpp | 3 +- Source/IO/IOBufferedProcessor.cpp | 35 +++--- Source/IO/IOPipeProcessor.cpp | 104 +++++++++--------- Source/IO/IOPipeProcessor.hpp | 22 ++-- Source/IO/IOProcessor.cpp | 98 +++++++++-------- Source/IO/IOProcessor.hpp | 4 +- Source/IO/IOProcessorItem.cpp | 41 ++++--- Source/IO/IOProcessorItem.hpp | 11 +- Source/IO/IOProcessorTimers.cpp | 12 +- Source/IO/IOProcessorTimers.hpp | 4 +- Source/IO/IOSimpleEventListener.cpp | 14 +-- Source/IO/IOSimpleEventListener.hpp | 4 +- Source/IO/Net/AuNetSocketChannelInput.cpp | 11 +- 32 files changed, 274 insertions(+), 221 deletions(-) diff --git a/Include/Aurora/IO/IAsyncFinishedSubscriber.hpp b/Include/Aurora/IO/IAsyncFinishedSubscriber.hpp index b9363a39..c1b3fe01 100644 --- a/Include/Aurora/IO/IAsyncFinishedSubscriber.hpp +++ b/Include/Aurora/IO/IAsyncFinishedSubscriber.hpp @@ -10,6 +10,6 @@ namespace Aurora::IO { AUKN_INTERFACE(IAsyncFinishedSubscriber, - AUI_METHOD(void, OnAsyncFileOpFinished, (AuUInt64, offset, AuUInt32, length)) + AUI_METHOD(void, OnAsyncFileOpFinished, (AuUInt64, uOffset, AuUInt32, uLength)) ); } \ No newline at end of file diff --git a/Include/Aurora/IO/IAsyncTransaction.hpp b/Include/Aurora/IO/IAsyncTransaction.hpp index cc6302bf..2f2407f9 100644 --- a/Include/Aurora/IO/IAsyncTransaction.hpp +++ b/Include/Aurora/IO/IAsyncTransaction.hpp @@ -24,8 +24,8 @@ namespace Aurora::IO */ struct IAsyncTransaction { - virtual bool StartRead(AuUInt64 offset, const AuSPtr &memoryView) = 0; - virtual bool StartWrite(AuUInt64 offset, const AuSPtr &memoryView) = 0; + virtual bool StartRead(AuUInt64 uOffset, const AuSPtr &memoryView) = 0; + virtual bool StartWrite(AuUInt64 uOffset, const AuSPtr &memoryView) = 0; /** * @brief Non-blocking is-signaled and call callback poll routine @@ -53,12 +53,12 @@ namespace Aurora::IO * @brief Registers an NT-like APC callback for the IO transaction. * Can be executed under any Aurora loop subsystem sleep */ - virtual void SetCallback(const AuSPtr &sub) = 0; + virtual void SetCallback(const AuSPtr &pSubscriber) = 0; /** * @brief Block for completion */ - virtual bool Wait(AuUInt32 timeout) = 0; + virtual bool Wait(AuUInt32 uTimeout) = 0; /** * @brief Provides a loop source that becomes signaled once the transaction is complete. diff --git a/Include/Aurora/IO/IByteBufferStreamPair.hpp b/Include/Aurora/IO/IByteBufferStreamPair.hpp index 9560f670..b3e4275f 100644 --- a/Include/Aurora/IO/IByteBufferStreamPair.hpp +++ b/Include/Aurora/IO/IByteBufferStreamPair.hpp @@ -19,6 +19,6 @@ namespace Aurora::IO }; AUKN_SYM AuSPtr NewByteBufferPair(); - AUKN_SYM AuSPtr NewByteBufferPairEx(AuUInt length, bool permitResize); - AUKN_SYM AuSPtr NewRingByteBuffer(AuUInt length); + AUKN_SYM AuSPtr NewByteBufferPairEx(AuUInt uLength, bool bPermitResize); + AUKN_SYM AuSPtr NewRingByteBuffer(AuUInt uLength); } \ No newline at end of file diff --git a/Include/Aurora/IO/IIOBufferedProcessor.hpp b/Include/Aurora/IO/IIOBufferedProcessor.hpp index 1ff5c415..77615418 100644 --- a/Include/Aurora/IO/IIOBufferedProcessor.hpp +++ b/Include/Aurora/IO/IIOBufferedProcessor.hpp @@ -16,8 +16,17 @@ namespace Aurora::IO virtual AuUInt32 GetRawBytesLimit() = 0; }; - AUKN_SYM AuSPtr NewBufferedProcessor(const AuSPtr &source, - const AuSPtr &processor, - const AuSPtr &drain, // TODO: share IOPipeCallback - AuUInt32 bufferSize); + /** + * @brief + * @param source + * @param processor + * @param drain + * @param bufferSize + * @deprecated by Aurora::IO::Protocol (tobe AuProtocol:: and AuIOProtocol::) + * @return + */ + AUKN_SYM AuSPtr NewBufferedProcessor(const AuSPtr &pSource, + const AuSPtr &pProcessor, + const AuSPtr &pDrain, // TODO: share IOPipeCallback + AuUInt32 uBufferSize); } \ No newline at end of file diff --git a/Include/Aurora/IO/IIOPipeEventListener.hpp b/Include/Aurora/IO/IIOPipeEventListener.hpp index ada4af2c..b8562662 100644 --- a/Include/Aurora/IO/IIOPipeEventListener.hpp +++ b/Include/Aurora/IO/IIOPipeEventListener.hpp @@ -12,7 +12,7 @@ namespace Aurora::IO struct IOPipeData; AUKN_INTERFACE(IIOPipeEventListener, - AUI_METHOD(void, OnPipePartialEvent, (AuUInt, transferred)), + AUI_METHOD(void, OnPipePartialEvent, (AuUInt, uTransferred)), AUI_METHOD(void, OnPipeSuccessEvent, ()), AUI_METHOD(void, OnPipeFailureEvent, ()) ); diff --git a/Include/Aurora/IO/IIOProcessor.hpp b/Include/Aurora/IO/IIOProcessor.hpp index 0395be04..3531359b 100644 --- a/Include/Aurora/IO/IIOProcessor.hpp +++ b/Include/Aurora/IO/IIOProcessor.hpp @@ -55,7 +55,27 @@ namespace Aurora::IO virtual bool HasItems () = 0; }; - AUKN_SYM AuSPtr NewIOProcessor(bool tickOnly, const AuSPtr &queue); - AUKN_SYM AuSPtr NewIOProcessorOnThread(bool tickOnly, Async::WorkerPId_t id); - AUKN_SYM AuSPtr NewIOProcessorNoQueue(bool tickOnly); + /** + * @brief Creates an IOProcessor to execute tasks on a given loop queue + * @param bTickOnly True if kernel yielding on IO objects is disabled in favor of forced TPS. + * Do note that the throughput of a tick will differ per platform. Accept rate, + * throughput, etc may need tweaking, if you wish to drive an IIOProcessor using + * a timer or the manual tick functions. + * @param pQueue + * @return + */ + AUKN_SYM AuSPtr NewIOProcessor(bool bTickOnly, const AuSPtr &pQueue); + + /** + * @brief Creates an IOProcessor to execute tasks on the loop queue of a given AuAsync thread id + * @param bTickOnly True if kernel yielding on IO objects is disabled in favor of forced TPS. + * Do note that the throughput of a tick will differ per platform. Accept rate, + * throughput, etc may need tweaking, if you wish to drive an IIOProcessor using + * a timer or the manual tick functions. + * @param pQueue + * @return + */ + AUKN_SYM AuSPtr NewIOProcessorOnThread(bool bTickOnly, Async::WorkerPId_t id); + + AUKN_SYM AuSPtr NewIOProcessorNoQueue(bool bTickOnly); } \ No newline at end of file diff --git a/Include/Aurora/IO/IIOProcessorManualInvoker.hpp b/Include/Aurora/IO/IIOProcessorManualInvoker.hpp index a8c585a6..a748ecd6 100644 --- a/Include/Aurora/IO/IIOProcessorManualInvoker.hpp +++ b/Include/Aurora/IO/IIOProcessorManualInvoker.hpp @@ -11,6 +11,9 @@ namespace Aurora::IO { struct IIOProcessorManualInvoker { + /** + * @brief Used to invoke a tick on an IIOProcessor + */ virtual void InvokeManualTick() = 0; }; } \ No newline at end of file diff --git a/Include/Aurora/IO/IIOSimpleEventListener.hpp b/Include/Aurora/IO/IIOSimpleEventListener.hpp index 87989e1c..a17a0d30 100644 --- a/Include/Aurora/IO/IIOSimpleEventListener.hpp +++ b/Include/Aurora/IO/IIOSimpleEventListener.hpp @@ -15,5 +15,8 @@ namespace Aurora::IO AUI_METHOD(void, OnIOComplete, ()) ); - AUKN_SYM AuSPtr DesimplifyIOEventListenerAdapter(const AuSPtr &interface); + /** + * @brief Creates an adapter to convert an object that implements the simple listener interface to the annoying internal listener interface (7 callbacks) + */ + AUKN_SYM AuSPtr DesimplifyIOEventListenerAdapter(const AuSPtr &pInterface); } \ No newline at end of file diff --git a/Include/Aurora/IO/IIOWaitableIOLoopSource.hpp b/Include/Aurora/IO/IIOWaitableIOLoopSource.hpp index add746ee..8b426db0 100644 --- a/Include/Aurora/IO/IIOWaitableIOLoopSource.hpp +++ b/Include/Aurora/IO/IIOWaitableIOLoopSource.hpp @@ -12,9 +12,22 @@ namespace Aurora::IO struct IIOWatachableIOLoopSource : IIOWaitableItem { virtual AuSPtr GetLoopSource() = 0; - virtual AuSPtr SetLoopSource(const AuSPtr &ls) = 0; + virtual AuSPtr SetLoopSource(const AuSPtr &pLoopSource) = 0; }; - AUKN_SYM AuSPtr NewWaitableLoopSource(const AuSPtr &ptr); - AUKN_SYM AuSPtr NewWaitableLoopSourceEx(const AuSPtr& ptr, AuUInt32 msTimeout); + /** + * @brief Creates an adapter given an ILoopSource for the IO subsystem to wait on + * @param pLoopSource + * @return + */ + AUKN_SYM AuSPtr NewWaitableLoopSource(const AuSPtr &pLoopSource); + + /** + * @brief Creates an adapter given an ILoopSource for the IO subsystem to wait on + * @param pLoopSource + * @param uMsTimeout + * @return + */ + AUKN_SYM AuSPtr NewWaitableLoopSourceEx(const AuSPtr &pLoopSource, + AuUInt32 uMsTimeout); } \ No newline at end of file diff --git a/Include/Aurora/IO/IIOWaitableItem.hpp b/Include/Aurora/IO/IIOWaitableItem.hpp index 09c781ad..676ffcc6 100644 --- a/Include/Aurora/IO/IIOWaitableItem.hpp +++ b/Include/Aurora/IO/IIOWaitableItem.hpp @@ -9,7 +9,6 @@ namespace Aurora::IO { - // Unstable and hacky interface // You shouldn't try to implement this yourself // Defer to IIOWaitableIOLoopSource, IIOWaitableIOTimer, and IIOWaitableTickLimiter for intended impl usage @@ -29,5 +28,4 @@ namespace Aurora::IO AUI_METHOD(bool, ApplyRateLimit, ()) ); - } \ No newline at end of file diff --git a/Include/Aurora/IO/IOAdapterAsyncStream.hpp b/Include/Aurora/IO/IOAdapterAsyncStream.hpp index 18f37272..c81cdeec 100644 --- a/Include/Aurora/IO/IOAdapterAsyncStream.hpp +++ b/Include/Aurora/IO/IOAdapterAsyncStream.hpp @@ -46,5 +46,5 @@ namespace Aurora::IO virtual bool Reset() = 0; }; - AUKN_SYM AuSPtr NewAsyncStreamAdapter(const AuSPtr &transaction, bool isStream); + AUKN_SYM AuSPtr NewAsyncStreamAdapter(const AuSPtr &pTransaction, bool bIsStream); } \ No newline at end of file diff --git a/Include/Aurora/IO/IOAdapterByteBuffer.hpp b/Include/Aurora/IO/IOAdapterByteBuffer.hpp index 09d816a8..6224dbae 100644 --- a/Include/Aurora/IO/IOAdapterByteBuffer.hpp +++ b/Include/Aurora/IO/IOAdapterByteBuffer.hpp @@ -9,7 +9,7 @@ namespace Aurora::IO { - AUKN_SYM AuSPtr NewByteBufferReadAdapter(const AuSPtr &buffer); - AUKN_SYM AuSPtr NewByteBufferLinearSeekableAdapter(const AuSPtr &buffer); - AUKN_SYM AuSPtr NewByteBufferWriteAdapter(const AuSPtr &buffer); + AUKN_SYM AuSPtr NewByteBufferReadAdapter(const AuSPtr &pBuffer); + AUKN_SYM AuSPtr NewByteBufferLinearSeekableAdapter(const AuSPtr &pBuffer); + AUKN_SYM AuSPtr NewByteBufferWriteAdapter(const AuSPtr &pBuffer); } \ No newline at end of file diff --git a/Include/Aurora/IO/IOAdapterCompression.hpp b/Include/Aurora/IO/IOAdapterCompression.hpp index 480e220b..fd89a3ce 100644 --- a/Include/Aurora/IO/IOAdapterCompression.hpp +++ b/Include/Aurora/IO/IOAdapterCompression.hpp @@ -9,6 +9,6 @@ namespace Aurora::IO { - AUKN_SYM AuSPtr NewCompressionReadAdapter(const AuSPtr &compresionStream); - AUKN_SYM AuSPtr NewCompressionSeekingAdapter(const AuSPtr &compresionStream); + AUKN_SYM AuSPtr NewCompressionReadAdapter(const AuSPtr &pCompresionStream); + AUKN_SYM AuSPtr NewCompressionSeekingAdapter(const AuSPtr &pCompresionStream); } \ No newline at end of file diff --git a/Include/Aurora/IO/IOAdapterSeeking.hpp b/Include/Aurora/IO/IOAdapterSeeking.hpp index d2626967..1ea88aea 100644 --- a/Include/Aurora/IO/IOAdapterSeeking.hpp +++ b/Include/Aurora/IO/IOAdapterSeeking.hpp @@ -9,5 +9,5 @@ namespace Aurora::IO { - AUKN_SYM AuSPtr NewSeekingReadAdapter(const AuSPtr &reader); + AUKN_SYM AuSPtr NewSeekingReadAdapter(const AuSPtr &pReader); } \ No newline at end of file diff --git a/Include/Aurora/IO/IOPipeCallback.hpp b/Include/Aurora/IO/IOPipeCallback.hpp index 962ad3aa..db0574ac 100644 --- a/Include/Aurora/IO/IOPipeCallback.hpp +++ b/Include/Aurora/IO/IOPipeCallback.hpp @@ -18,13 +18,13 @@ namespace Aurora::IO struct { - AuSPtr onData; + AuSPtr pOnData; } handleBufferedStream; struct { - AuSPtr intercepter; - AuSPtr writer; + AuSPtr pIntercepter; + AuSPtr pWriter; bool bFlushWriter {true}; } forwardStream; diff --git a/Include/Aurora/IO/IOPipeInputData.hpp b/Include/Aurora/IO/IOPipeInputData.hpp index eb536a30..c1ba8f29 100644 --- a/Include/Aurora/IO/IOPipeInputData.hpp +++ b/Include/Aurora/IO/IOPipeInputData.hpp @@ -16,17 +16,17 @@ namespace Aurora::IO /** * @brief IO trigger */ - AuSPtr watchItem; + AuSPtr pWatchItem; /** * @brief Input source */ - AuSPtr reader; + AuSPtr pReader; /** * @brief Callbacks */ - AuSPtr backend; + AuSPtr pBackend; /** * @brief Enables aggressive stream consumption, allowing for bias towards clients if they were to send a lot of data (including dos) diff --git a/Include/Aurora/IO/IOPipeRequest.hpp b/Include/Aurora/IO/IOPipeRequest.hpp index 628e75af..7e30849f 100644 --- a/Include/Aurora/IO/IOPipeRequest.hpp +++ b/Include/Aurora/IO/IOPipeRequest.hpp @@ -16,31 +16,31 @@ namespace Aurora::IO /** * @brief Amount of bytes to transfer or zero if run until EoS/EoF */ - AuUInt32 lengthOrZero {}; + AuUInt32 uLengthOrZero {}; /** * @brief true if the underlying stream uses relative stream positions * (IE: a network or pipe stream that cannot seek backwards and fowards) * (Inversely, file devices and similar IO subsystems use IO packets with absolute offsets) */ - bool isStream {false}; + bool bIsStream {false}; /** * @brief internal frame size, that is one iteration of file or stream read, in bytes or zero if fallback * Windows is inclined to read every single requested byte of a file stream asynchronously * Streams, on all platforms, yield as soon as there is data available to copy over (usually) */ - AuUInt32 pageLengthOrZero {}; + AuUInt32 uPageLengthOrZero {}; /** * @brief internal buffer size or zero if fallback */ - AuUInt32 bufferLengthOrZero {}; + AuUInt32 uBufferLengthOrZero {}; /** * @brief event listener */ - AuSPtr listener; + AuSPtr pListener; /** * @brief Used as the buffer size for streams of page length 0 diff --git a/Include/Aurora/IO/IOPipeRequestAIO.hpp b/Include/Aurora/IO/IOPipeRequestAIO.hpp index 171d9afe..169000c2 100644 --- a/Include/Aurora/IO/IOPipeRequestAIO.hpp +++ b/Include/Aurora/IO/IOPipeRequestAIO.hpp @@ -13,7 +13,7 @@ namespace Aurora::IO struct IOPipeRequestAIO : IOPipeRequest { - AuSPtr asyncTransaction; + AuSPtr pAsyncTransaction; IOPipeCallback output; }; diff --git a/Include/Aurora/IO/IOSleep.hpp b/Include/Aurora/IO/IOSleep.hpp index da170a57..2f7c5aa0 100644 --- a/Include/Aurora/IO/IOSleep.hpp +++ b/Include/Aurora/IO/IOSleep.hpp @@ -15,7 +15,7 @@ namespace Aurora::IO * @param timeout Timeout in milliseconds, zero = indefinite * @return true on preemption, false on timeout */ - AUKN_SYM bool WaitFor(AuUInt32 milliseconds, bool waitEntireFrame = true); + AUKN_SYM bool WaitFor(AuUInt32 uMilliseconds, bool bWaitEntireFrame = true); /** * Sleeps for the given timeout in milliseconds @@ -24,7 +24,7 @@ namespace Aurora::IO * @param timeout Timeout in milliseconds, zero = indefinite * @return True if an IO event occurred */ - AUKN_SYM bool IOSleep(AuUInt32 milliseconds); + AUKN_SYM bool IOSleep(AuUInt32 uMilliseconds); /** * Sleeps up to the given timeout in milliseconds @@ -33,7 +33,7 @@ namespace Aurora::IO * @param timeout Timeout in milliseconds, zero = indefinite * @return True if an IO event occurred */ - AUKN_SYM bool IOYieldFor(AuUInt32 milliseconds); + AUKN_SYM bool IOYieldFor(AuUInt32 uMilliseconds); /** * Nonblocking yield for IO alerts diff --git a/Include/Aurora/IO/ISeekingReader.hpp b/Include/Aurora/IO/ISeekingReader.hpp index c735f99f..b08503a6 100644 --- a/Include/Aurora/IO/ISeekingReader.hpp +++ b/Include/Aurora/IO/ISeekingReader.hpp @@ -11,7 +11,8 @@ namespace Aurora::IO { AUKN_INTERFACE(ISeekingReader, AUI_METHOD(EStreamError, IsOpen, ()), - AUI_METHOD(EStreamError, ArbitraryRead, (AuUInt, offset, const Memory::MemoryViewStreamWrite&, paramters)), + AUI_METHOD(EStreamError, ArbitraryRead, (AuUInt, uOffset, + const Memory::MemoryViewStreamWrite&, writeView)), AUI_METHOD(void, Close, ()) ); } \ No newline at end of file diff --git a/Source/IO/IOBufferedProcessor.cpp b/Source/IO/IOBufferedProcessor.cpp index ea544652..188ffcd5 100644 --- a/Source/IO/IOBufferedProcessor.cpp +++ b/Source/IO/IOBufferedProcessor.cpp @@ -13,10 +13,10 @@ namespace Aurora::IO { struct IOBufferedProcessor : IIOBufferedProcessor { - AuSPtr source; - AuSPtr drain; - AuSPtr processor; - AuUInt32 bufferSize {}; + AuSPtr pSource; + AuSPtr pDrain; + AuSPtr pProcessor; + AuUInt32 uBufferSize {}; AuByteBuffer buffer; AU_DEFINE_FOR_VA(IOBufferedProcessor, @@ -25,7 +25,7 @@ namespace Aurora::IO AU_DEFINE_EQUALS_VA, // add equals operator AU_DEFINE_MOVE_VA, // add move assignment operator AU_DEFINE_COPY_VA), // add copy assignment operator - (source, drain, processor, bufferSize)); + (pSource, pDrain, pProcessor, uBufferSize)); AuUInt32 TryProcessBuffered() override; @@ -39,7 +39,7 @@ namespace Aurora::IO { if (this->buffer.IsEmpty()) { - this->buffer.Allocate(this->bufferSize); + this->buffer.Allocate(this->uBufferSize); this->buffer.flagCircular = true; // !!! } @@ -54,7 +54,7 @@ namespace Aurora::IO AuUInt read {}; try { - if (this->source->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer.writePtr, canBuffer), read)) != + if (this->pSource->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer.writePtr, canBuffer), read)) != AuIO::EStreamError::eErrorNone) { return TryPump(); @@ -82,12 +82,17 @@ namespace Aurora::IO do { + if (this->buffer.flagCircular && this->buffer.readPtr == this->buffer.base + this->buffer.length) + { + this->buffer.readPtr = this->buffer.base; + } + AuUInt canRead = this->buffer.RemainingBytes(); canRead = AuMin(canRead, (this->buffer.length + this->buffer.base) - this->buffer.readPtr); try { - if (!this->processor->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer.readPtr, canRead), bytesProcessed), this->drain)) + if (!this->pProcessor->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer.readPtr, canRead), bytesProcessed), this->pDrain)) { break; } @@ -100,7 +105,7 @@ namespace Aurora::IO this->buffer.readPtr += bytesProcessed; bytesProcessedTotal += bytesProcessed; - if (this->buffer.readPtr == this->buffer.base + this->buffer.length) + if (this->buffer.flagCircular && this->buffer.readPtr == this->buffer.base + this->buffer.length) { this->buffer.readPtr = this->buffer.base; } @@ -117,14 +122,14 @@ namespace Aurora::IO AuUInt32 IOBufferedProcessor::GetRawBytesLimit() { - return this->bufferSize; + return this->uBufferSize; } - AUKN_SYM AuSPtr NewBufferedProcessor(const AuSPtr &source, - const AuSPtr &processor, - const AuSPtr &drain, - AuUInt32 bufferSize) + AUKN_SYM AuSPtr NewBufferedProcessor(const AuSPtr &pSource, + const AuSPtr &pProcessor, + const AuSPtr &pDrain, + AuUInt32 uBufferSize) { - return AuMakeShared(source, drain, processor, bufferSize); + return AuMakeShared(pSource, pDrain, pProcessor, uBufferSize); } } \ No newline at end of file diff --git a/Source/IO/IOPipeProcessor.cpp b/Source/IO/IOPipeProcessor.cpp index be0291c1..a440b226 100644 --- a/Source/IO/IOPipeProcessor.cpp +++ b/Source/IO/IOPipeProcessor.cpp @@ -19,13 +19,13 @@ namespace Aurora::IO endCallback(this), output(request.output) { - this->frameCap_ = request.pageLengthOrZero ? request.pageLengthOrZero : request.kFallbackPageSize; - this->bufferSize_ = request.bufferLengthOrZero ? request.bufferLengthOrZero : request.kFallbackBufferSize; - this->bytesWrittenLimit_ = request.lengthOrZero; - this->asyncTransaction_ = request.asyncTransaction; - this->asyncAdapter_ = NewAsyncStreamAdapter(request.asyncTransaction, request.isStream); - SysAssert(this->asyncAdapter_); - this->asyncStreamReader_ = this->asyncAdapter_->ToStreamReader(); + this->uFrameCap_ = request.uPageLengthOrZero ? request.uPageLengthOrZero : request.kFallbackPageSize; + this->uBufferSize_ = request.uBufferLengthOrZero ? request.uBufferLengthOrZero : request.kFallbackBufferSize; + this->uBytesWrittenLimit_ = request.uLengthOrZero; + this->pAsyncTransaction_ = request.pAsyncTransaction; + this->pAsyncAdapter_ = NewAsyncStreamAdapter(request.pAsyncTransaction, request.bIsStream); + SysAssert(this->pAsyncAdapter_); + this->pAsyncStreamReader_ = this->pAsyncAdapter_->ToStreamReader(); } IOPipeWork::IOPipeWork(const AuSPtr &parent, const IOPipeRequestBasic &request) : @@ -35,9 +35,9 @@ namespace Aurora::IO endCallback(this), output(request.output) { - this->bufferSize_ = request.bufferLengthOrZero ? request.bufferLengthOrZero : request.kFallbackBufferSize; - this->frameCap_ = request.pageLengthOrZero ? request.pageLengthOrZero : request.kFallbackPageSize; - this->bytesWrittenLimit_ = request.lengthOrZero; + this->uBufferSize_ = request.uBufferLengthOrZero ? request.uBufferLengthOrZero : request.kFallbackBufferSize; + this->uFrameCap_ = request.uPageLengthOrZero ? request.uPageLengthOrZero : request.kFallbackPageSize; + this->uBytesWrittenLimit_ = request.uLengthOrZero; } void IOPipeWork::Tick_FrameEpilogue() @@ -50,7 +50,7 @@ namespace Aurora::IO void IOPipeWork::Tick_Any() { - if (this->asyncTransaction_) + if (this->pAsyncTransaction_) { this->AsyncPump(); } @@ -110,9 +110,9 @@ namespace Aurora::IO { AuSPtr ret; - if (this->asyncTransaction_) + if (this->pAsyncTransaction_) { - auto pWaitable = this->asyncAdapter_->ToWaitable(); + auto pWaitable = this->pAsyncAdapter_->ToWaitable(); if (pWaitable) { ret = this->parent_->parent_->StartIOWatch(pWaitable, AuSharedFromThis()); @@ -124,7 +124,7 @@ namespace Aurora::IO } else { - ret = this->parent_->parent_->StartIOWatch(this->input_.watchItem, AuSharedFromThis()); + ret = this->parent_->parent_->StartIOWatch(this->input_.pWatchItem, AuSharedFromThis()); if (!ret) { @@ -132,7 +132,7 @@ namespace Aurora::IO } } - this->watch = ret; + this->pWatch = ret; if (this->parent_->parent_->CheckThread()) { @@ -165,7 +165,7 @@ namespace Aurora::IO return; } - if (!this->buffer_.Allocate(this->bufferSize_, AuHwInfo::GetPageSize(), true)) + if (!this->buffer_.Allocate(this->uBufferSize_, AuHwInfo::GetPageSize(), true)) { SysPushErrorMem(); TerminateOnThread(true); @@ -185,7 +185,7 @@ namespace Aurora::IO { AuMemoryViewWrite internalBuffer; - auto err = this->asyncStreamReader_->Dequeue(0, internalBuffer); + auto err = this->pAsyncStreamReader_->Dequeue(0, internalBuffer); if (err != EStreamError::eErrorNone) { SysPushErrorIO("Async Stream Error: {}", err); @@ -200,7 +200,7 @@ namespace Aurora::IO return; } - err = this->asyncStreamReader_->Dequeue(internalBuffer.length, internalBuffer); + err = this->pAsyncStreamReader_->Dequeue(internalBuffer.length, internalBuffer); if (err != EStreamError::eErrorNone) { SysPushErrorIO("Async Stream Error: {}", err); @@ -221,7 +221,7 @@ namespace Aurora::IO AuUInt read {}; try { - if (this->input_.reader->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer_.writePtr, canBuffer), read)) != + if (this->input_.pReader->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer_.writePtr, canBuffer), read)) != AuIO::EStreamError::eErrorNone) { TerminateOnThread(); @@ -248,11 +248,11 @@ namespace Aurora::IO { AuUInt canBuffer = this->buffer_.RemainingWrite(); canBuffer = AuMin(canBuffer, AuUInt((this->buffer_.length + this->buffer_.base) - this->buffer_.writePtr)); - canBuffer = AuMin(AuUInt(this->frameCap_), canBuffer); + canBuffer = AuMin(AuUInt(this->uFrameCap_), canBuffer); this->nextWriteAsync_ = AuMemoryViewWrite(this->buffer_.writePtr, canBuffer); - if (this->asyncStreamReader_->BeginRead(AuSPtr(this->SharedFromThis(), &this->nextWriteAsync_)) != + if (this->pAsyncStreamReader_->BeginRead(AuSPtr(this->SharedFromThis(), &this->nextWriteAsync_)) != AuIO::EStreamError::eErrorNone) { TerminateOnThread(true); @@ -284,15 +284,15 @@ namespace Aurora::IO return; } - if (this->asyncTransaction_) + if (this->pAsyncTransaction_) { ReadNextAsync(); } else { - if (this->input_.backend) + if (this->input_.pBackend) { - this->input_.backend->OnEndPump(); + this->input_.pBackend->OnEndPump(); } } } @@ -323,7 +323,7 @@ namespace Aurora::IO { if (this->output.type == EPipeCallbackType::eTryHandleBufferedPart) { - if (!this->output.handleBufferedStream.onData->OnDataAvailable(this->buffer_)) + if (!this->output.handleBufferedStream.pOnData->OnDataAvailable(this->buffer_)) { bytesProcessed = 0; this->buffer_.readPtr = this->buffer_.base + readHead; @@ -335,7 +335,7 @@ namespace Aurora::IO } else { - if (!this->output.forwardStream.intercepter->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer_.readPtr, canRead), bytesProcessed), this->output.forwardStream.writer)) + if (!this->output.forwardStream.pIntercepter->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer_.readPtr, canRead), bytesProcessed), this->output.forwardStream.pWriter)) { bytesProcessed = 0; } @@ -390,7 +390,7 @@ namespace Aurora::IO { if (this->output.forwardStream.bFlushWriter) { - this->output.forwardStream.writer->Flush(); + this->output.forwardStream.pWriter->Flush(); } } } @@ -401,11 +401,11 @@ namespace Aurora::IO this->bShouldReadNext = true; } - this->bytesWritten_ += bytesProcessedTotal; + this->uBytesWritten_ += bytesProcessedTotal; - if (this->request.listener) + if (this->request.pListener) { - this->request.listener->OnPipePartialEvent(bytesProcessedTotal); + this->request.pListener->OnPipePartialEvent(bytesProcessedTotal); } return bytesProcessedTotal; @@ -413,7 +413,7 @@ namespace Aurora::IO bool IOPipeWork::IsAtRequestedEnd() { - return this->bytesWrittenLimit_ && (this->bytesWrittenLimit_ <= this->bytesWritten_); + return this->uBytesWrittenLimit_ && (this->uBytesWrittenLimit_ <= this->uBytesWritten_); } AuByteBuffer *IOPipeWork::GetBuffer() @@ -423,12 +423,12 @@ namespace Aurora::IO void IOPipeWork::RunOnThread() { - if (this->input_.backend) + if (this->input_.pBackend) { - this->input_.backend->OnStart(); + this->input_.pBackend->OnStart(); } - if (this->asyncTransaction_) + if (this->pAsyncTransaction_) { PrepareAsync(); } @@ -447,50 +447,50 @@ namespace Aurora::IO this->bActive = false; - if (this->watch) + if (this->pWatch) { - watch->StopWatch(); + this->pWatch->StopWatch(); } - if (this->request.listener) + if (this->request.pListener) { if (error) { // We explicitly failed... - this->request.listener->OnPipeFailureEvent(); + this->request.pListener->OnPipeFailureEvent(); } - else if (this->bytesWrittenLimit_ && (this->bytesWrittenLimit_ > this->bytesWritten_)) + else if (this->uBytesWrittenLimit_ && (this->uBytesWrittenLimit_ > this->uBytesWritten_)) { // Finished without error early - this->request.listener->OnPipeFailureEvent(); + this->request.pListener->OnPipeFailureEvent(); } else { // We finished... - this->request.listener->OnPipeSuccessEvent(); + this->request.pListener->OnPipeSuccessEvent(); } - this->request.listener.reset(); + this->request.pListener.reset(); } - this->output.handleBufferedStream.onData.reset(); - this->output.forwardStream.intercepter.reset(); - this->output.forwardStream.writer.reset(); + this->output.handleBufferedStream.pOnData.reset(); + this->output.forwardStream.pIntercepter.reset(); + this->output.forwardStream.pWriter.reset(); - if (this->input_.backend) + if (this->input_.pBackend) { - this->input_.backend->OnEnd(); - this->input_.backend.reset(); + this->input_.pBackend->OnEnd(); + this->input_.pBackend.reset(); } - if (auto transaction = this->asyncTransaction_) + if (auto transaction = this->pAsyncTransaction_) { transaction->Reset(); - this->asyncTransaction_.reset(); + this->pAsyncTransaction_.reset(); } - this->asyncAdapter_.reset(); - this->asyncStreamReader_.reset(); + this->pAsyncAdapter_.reset(); + this->pAsyncStreamReader_.reset(); } IOPipeProcessor::IOPipeProcessor(IOProcessor *parent) : diff --git a/Source/IO/IOPipeProcessor.hpp b/Source/IO/IOPipeProcessor.hpp index d1ace239..b342ad5d 100644 --- a/Source/IO/IOPipeProcessor.hpp +++ b/Source/IO/IOPipeProcessor.hpp @@ -35,10 +35,10 @@ namespace Aurora::IO struct IOPipeWork : IIOPipeWork, IIOEventListenerFunctional, AuEnableSharedFromThis { - IOPipeWork(const AuSPtr &parent, const IOPipeRequestAIO &request); - IOPipeWork(const AuSPtr &parent, const IOPipeRequestBasic &request); + IOPipeWork(const AuSPtr &pParent, const IOPipeRequestAIO &request); + IOPipeWork(const AuSPtr &pParent, const IOPipeRequestBasic &request); - AuSPtr watch; + AuSPtr pWatch; void Tick_FrameEpilogue() override; void Tick_Any() override; @@ -51,7 +51,7 @@ namespace Aurora::IO virtual bool End() override; void RunOnThread(); - void TerminateOnThread(bool error = false); + void TerminateOnThread(bool bError = false); // INIT void PrepareStream(); @@ -80,19 +80,19 @@ namespace Aurora::IO struct /*not a union. the following members are mutex*/ { IOPipeInputData input_; - AuSPtr asyncTransaction_; - AuSPtr asyncAdapter_; - AuSPtr asyncStreamReader_; + AuSPtr pAsyncTransaction_; + AuSPtr pAsyncAdapter_; + AuSPtr pAsyncStreamReader_; }; IOPipeCallback output; IOWorkStart startCallback; IOWorkEnd endCallback; bool bActive {true}; - AuUInt32 bufferSize_ {}; - AuUInt32 frameCap_ {}; - AuUInt bytesWritten_ {}; - AuUInt bytesWrittenLimit_ {}; + AuUInt32 uBufferSize_ {}; + AuUInt32 uFrameCap_ {}; + AuUInt uBytesWritten_ {}; + AuUInt uBytesWrittenLimit_ {}; AuByteBuffer buffer_; }; diff --git a/Source/IO/IOProcessor.cpp b/Source/IO/IOProcessor.cpp index 9cafceef..d41f9af0 100644 --- a/Source/IO/IOProcessor.cpp +++ b/Source/IO/IOProcessor.cpp @@ -16,11 +16,12 @@ namespace Aurora::IO { - IOProcessor::IOProcessor(AuUInt threadId, bool tickOnly, + IOProcessor::IOProcessor(AuUInt threadId, + bool bTickOnly, AuAsync::WorkerPId_t worker, - const AuSPtr &loop) : - mutliplexIOAndTimer(!tickOnly), - loopQueue(loop), + const AuSPtr &pLoopQueue) : + mutliplexIOAndTimer(!bTickOnly), + pLoopQueue(pLoopQueue), asyncWorker(worker), threadId(threadId), streamProcessors(this) @@ -101,7 +102,7 @@ namespace Aurora::IO return; } - AuStaticCast(this->loopQueue)->AddHook([that = AuSharedFromThis()]() + AuStaticCast(this->pLoopQueue)->AddHook([that = AuSharedFromThis()]() { that->RunTick(); }); @@ -204,9 +205,9 @@ namespace Aurora::IO for (auto &item : this->items.allItems) { - if (item->item->IsRunOnSelfIO() && item->item->IsRunOnSelfIOCheckedOnTimerTick()) + if (item->pItem->IsRunOnSelfIO() && item->pItem->IsRunOnSelfIOCheckedOnTimerTick()) { - auto ls = item->item->GetSelfIOSource(); + auto ls = item->pItem->GetSelfIOSource(); if (ls && ls->IsSignaled()) { // Should deadlock without critical sections @@ -220,16 +221,16 @@ namespace Aurora::IO { for (auto &a : this->items.onTickReceivers) { - if (a->listener) + if (a->pListener) { try { - a->listener->Tick_RunOnTick(); + a->pListener->Tick_RunOnTick(); if (!AuExists(this->items.workSignaled, a) && !AuExists(this->items.onTickReceivers, a)) { - a->listener->Tick_Any(); + a->pListener->Tick_Any(); } } catch (...) @@ -243,12 +244,12 @@ namespace Aurora::IO for (auto &a : this->items.workSignaled) { - if (a->listener) + if (a->pListener) { try { - a->listener->Tick_SelfIOEvent(); - a->listener->Tick_Any(); + a->pListener->Tick_SelfIOEvent(); + a->pListener->Tick_Any(); } catch (...) { @@ -274,12 +275,12 @@ namespace Aurora::IO continue; } - if (a->listener) + if (a->pListener) { try { - a->listener->Tick_OtherIOEvent(); - a->listener->Tick_Any(); + a->pListener->Tick_OtherIOEvent(); + a->pListener->Tick_Any(); } catch (...) { @@ -299,14 +300,14 @@ namespace Aurora::IO for (auto &pumped : this->items.finalizeQueue) { - if (!pumped->listener) + if (!pumped->pListener) { continue; } try { - pumped->listener->Tick_FrameEpilogue(); + pumped->pListener->Tick_FrameEpilogue(); } catch (...) { @@ -344,15 +345,15 @@ namespace Aurora::IO try { - if (processor->listener) + if (processor->pListener) { if (fatal) { - processor->listener->OnFailureCompletion(); + processor->pListener->OnFailureCompletion(); } else { - processor->listener->OnNominalCompletion(); + processor->pListener->OnNominalCompletion(); } } } @@ -361,7 +362,7 @@ namespace Aurora::IO SysPushErrorCatch(); } - auto item = processor->item; + auto item = processor->pItem; if (item->IsRunOnTick()) { @@ -452,7 +453,7 @@ namespace Aurora::IO } else { - return this->loopQueue->WaitAny(msMax); + return this->pLoopQueue->WaitAny(msMax); } } @@ -558,7 +559,7 @@ namespace Aurora::IO void IOProcessor::UpdateTimers() { - this->timers.lsTicker->UpdateTickRateIfAnyNs(this->refreshRateNs); + this->timers.pLsTicker->UpdateTickRateIfAnyNs(this->refreshRateNs); this->timers.nbTicker.nsTimeStep = this->refreshRateNs; if (!this->timers.nbTicker.nextTriggerTime) @@ -581,21 +582,21 @@ namespace Aurora::IO void IOProcessor::AddTimerLS() { - this->ToQueue()->SourceAdd(this->timers.lsTicker); - this->ToQueue()->AddCallback(this->timers.lsTicker, AuSPtr(AuSharedFromThis(), &this->timers)); + this->ToQueue()->SourceAdd(this->timers.pLsTicker); + this->ToQueue()->AddCallback(this->timers.pLsTicker, AuSPtr(AuSharedFromThis(), &this->timers)); this->ToQueue()->Commit(); } void IOProcessor::StartAsyncTimerIfAny() { - if (!this->workItem) + if (!this->pWorkItem) { - this->workItem = this->asyncWorker.pool->NewWorkItem(this->asyncWorker, AuSharedFromThis()); + this->pWorkItem = this->asyncWorker.pool->NewWorkItem(this->asyncWorker, AuSharedFromThis()); } - this->workItem->SetSchedTimeNs(this->refreshRateNs); + this->pWorkItem->SetSchedTimeNs(this->refreshRateNs); - this->workItem->Dispatch(); + this->pWorkItem->Dispatch(); } void IOProcessor::RemoveTimer() @@ -612,13 +613,13 @@ namespace Aurora::IO void IOProcessor::CancelWorkItem() { - if (!this->workItem) + if (!this->pWorkItem) { return; } - this->workItem->Cancel(); - this->workItem.reset(); + this->pWorkItem->Cancel(); + this->pWorkItem.reset(); } void IOProcessor::RemoveLSTimer() @@ -629,7 +630,7 @@ namespace Aurora::IO return; } - queue->SourceRemove(this->timers.lsTicker); + queue->SourceRemove(this->timers.pLsTicker); } bool IOProcessor::IsAsync() @@ -643,13 +644,14 @@ namespace Aurora::IO this->timers.nbTicker.nextTriggerTime; } - bool IOProcessor::RequestRemovalForItemFromAnyThread(const AuSPtr &processor) + bool IOProcessor::RequestRemovalForItemFromAnyThread(const AuSPtr &pProcessor) { return {}; } - - AuSPtr IOProcessor::StartIOWatchEx(const AuSPtr& object, const AuSPtr& listener, bool singleshot) + AuSPtr IOProcessor::StartIOWatchEx(const AuSPtr &pItem, + const AuSPtr &pListener, + bool bSingleshot) { if (!CheckThread()) { @@ -657,16 +659,16 @@ namespace Aurora::IO } auto item = AuMakeShared(); - item->parent = this; - item->listener = listener; - item->item = object; - item->singleshot = singleshot; + item->pParent = this; + item->pListener = pListener; + item->pItem = pItem; + item->bSingleshot = bSingleshot; AU_LOCK_GUARD(this->items.mutex); this->items.allItems.push_back(item); - if (object->IsRunOnTick()) + if (pItem->IsRunOnTick()) { if (!AuTryInsert(this->items.onTickReceivers, item)) { @@ -675,7 +677,7 @@ namespace Aurora::IO } } - if (object->IsRunOnOtherTick()) + if (pItem->IsRunOnOtherTick()) { if (!AuTryInsert(this->items.onOtherReceivers, item)) { @@ -684,16 +686,16 @@ namespace Aurora::IO } } - if (object->IsRunOnSelfIO()) + if (pItem->IsRunOnSelfIO()) { - auto src = object->GetSelfIOSource(); + auto src = pItem->GetSelfIOSource(); if (!src) { SysPushErrorGeneric(); return {}; } - auto timeout = object->IOTimeoutInMS(); + auto timeout = pItem->IOTimeoutInMS(); if (timeout) { if (!this->ToQueue()->SourceAddWithTimeout(src, timeout)) @@ -790,7 +792,7 @@ namespace Aurora::IO AuSPtr IOProcessor::ToQueue() { - return this->loopQueue; + return this->pLoopQueue; } void IOProcessor::ReleaseAllWatches() @@ -806,7 +808,7 @@ namespace Aurora::IO for (auto &io : this->items.registeredIO) { - queue->SourceRemove(io->item->GetSelfIOSource()); + queue->SourceRemove(io->pItem->GetSelfIOSource()); } queue->Commit(); diff --git a/Source/IO/IOProcessor.hpp b/Source/IO/IOProcessor.hpp index 6fe3af8d..613f054b 100644 --- a/Source/IO/IOProcessor.hpp +++ b/Source/IO/IOProcessor.hpp @@ -85,7 +85,7 @@ namespace Aurora::IO bool RequestRemovalForItemFromAnyThread(const AuSPtr &processor); - AuSPtr loopQueue; + AuSPtr pLoopQueue; AuUInt threadId; @@ -110,7 +110,7 @@ namespace Aurora::IO IOProcessorItems items; IOProcessorTimers timers; AuAsync::WorkerPId_t asyncWorker; - AuSPtr workItem; + AuSPtr pWorkItem; AuThreadPrimitives::MutexUnique_t mutex; diff --git a/Source/IO/IOProcessorItem.cpp b/Source/IO/IOProcessorItem.cpp index bfe61ac5..1dbff306 100644 --- a/Source/IO/IOProcessorItem.cpp +++ b/Source/IO/IOProcessorItem.cpp @@ -14,34 +14,34 @@ namespace Aurora::IO { bool IOProcessorItem::StopWatch() { - if (this->parent->CheckThread() && - this->parent->bFrameStart) + if (this->pParent->CheckThread() && + this->pParent->bFrameStart) { - this->parent->ClearProcessor(AuSharedFromThis(), false); + this->pParent->ClearProcessor(AuSharedFromThis(), false); return true; } - return this->parent->items.ScheduleFinish(AuSharedFromThis(), false); + return this->pParent->items.ScheduleFinish(AuSharedFromThis(), false); } bool IOProcessorItem::FailWatch() { - if (this->parent->CheckThread() && - this->parent->bFrameStart) + if (this->pParent->CheckThread() && + this->pParent->bFrameStart) { - this->parent->ClearProcessor(AuSharedFromThis(), true); + this->pParent->ClearProcessor(AuSharedFromThis(), true); return false; } - return this->parent->items.ScheduleFinish(AuSharedFromThis(), true); + return this->pParent->items.ScheduleFinish(AuSharedFromThis(), true); } bool IOProcessorItem::OnFinished(const AuSPtr& source, AuUInt8 pos) { - if (this->singleshot) + if (this->bSingleshot) { - if (AuExchange(this->triggered, true)) + if (AuExchange(this->bTriggered, true)) { StopWatch(); return false; @@ -49,7 +49,7 @@ namespace Aurora::IO } IOAlert(false); - return this->singleshot; + return this->bSingleshot; } void IOProcessorItem::OnTimeout(const AuSPtr& source) @@ -65,31 +65,31 @@ namespace Aurora::IO void IOProcessorItem::IOAlert(bool force) { - if (!this->parent->IsTickOnly()) + if (!this->pParent->IsTickOnly()) { - if (!this->parent->bFrameStart) + if (!this->pParent->bFrameStart) { if (force) { - this->parent->TickFor(AuSharedFromThis()); + this->pParent->TickFor(AuSharedFromThis()); } else { - this->parent->TickForHack(AuSharedFromThis()); + this->pParent->TickForHack(AuSharedFromThis()); } } else { - this->parent->TickForRegister(AuSharedFromThis()); + this->pParent->TickForRegister(AuSharedFromThis()); } } else if (force) // manual tick { - if (this->parent->CheckThread()) + if (this->pParent->CheckThread()) { - this->parent->TickFor(AuSharedFromThis()); + this->pParent->TickFor(AuSharedFromThis()); } - else if (parent->IsAsync()) + else if (this->pParent->IsAsync()) { // TODO: } @@ -99,8 +99,7 @@ namespace Aurora::IO } } - - if (this->singleshot) + if (this->bSingleshot) { StopWatch(); } diff --git a/Source/IO/IOProcessorItem.hpp b/Source/IO/IOProcessorItem.hpp index 35a96147..301fa8ff 100644 --- a/Source/IO/IOProcessorItem.hpp +++ b/Source/IO/IOProcessorItem.hpp @@ -13,12 +13,12 @@ namespace Aurora::IO struct IOProcessorItem : AuLoop::ILoopSourceSubscriberEx, IIOProcessorItem, AuEnableSharedFromThis { - IOProcessor *parent; - bool singleshot{}; - bool triggered{}; + IOProcessor *pParent; + bool bSingleshot{}; + bool bTriggered{}; - AuSPtr item; - AuSPtr listener; + AuSPtr pItem; + AuSPtr pListener; // ILoopSourceSubscriber bool OnFinished(const AuSPtr &source, AuUInt8 pos) override; @@ -34,5 +34,4 @@ namespace Aurora::IO // void IOAlert(bool force); }; - } \ No newline at end of file diff --git a/Source/IO/IOProcessorTimers.cpp b/Source/IO/IOProcessorTimers.cpp index 61c37a9c..8ff7b0b1 100644 --- a/Source/IO/IOProcessorTimers.cpp +++ b/Source/IO/IOProcessorTimers.cpp @@ -12,16 +12,16 @@ namespace Aurora::IO { - bool IOProcessorTimers::Init(IOProcessor *parent, AuSPtr lsTicker) + bool IOProcessorTimers::Init(IOProcessor *pParent, AuSPtr pLsTicker) { - this->parent = parent; - this->lsTicker = lsTicker; - return bool(this->parent) && bool(lsTicker); + this->pParent = pParent; + this->pLsTicker = pLsTicker; + return bool(this->pParent) && bool(pLsTicker); } - bool IOProcessorTimers::OnFinished(const AuSPtr &source) + bool IOProcessorTimers::OnFinished(const AuSPtr &pSource) { - this->parent->ManualTick(); + this->pParent->ManualTick(); return false; } } \ No newline at end of file diff --git a/Source/IO/IOProcessorTimers.hpp b/Source/IO/IOProcessorTimers.hpp index 7bb7ecde..64c69235 100644 --- a/Source/IO/IOProcessorTimers.hpp +++ b/Source/IO/IOProcessorTimers.hpp @@ -13,8 +13,8 @@ namespace Aurora::IO struct IOProcessorTimers : AuLoop::ILoopSourceSubscriber { - IOProcessor *parent {}; - AuSPtr lsTicker; + IOProcessor *pParent {}; + AuSPtr pLsTicker; Utility::RateLimiter nbTicker; bool Init(IOProcessor *parent, AuSPtr lsTicker); diff --git a/Source/IO/IOSimpleEventListener.cpp b/Source/IO/IOSimpleEventListener.cpp index ee6cf29f..98cf35b2 100644 --- a/Source/IO/IOSimpleEventListener.cpp +++ b/Source/IO/IOSimpleEventListener.cpp @@ -11,7 +11,7 @@ namespace Aurora::IO { - IOSimpleEventListener::IOSimpleEventListener(const AuSPtr &parent) : parent(parent) + IOSimpleEventListener::IOSimpleEventListener(const AuSPtr &pParent) : pParent(pParent) { } @@ -33,7 +33,7 @@ namespace Aurora::IO void IOSimpleEventListener::Tick_Any() { - this->parent->OnIOTick(); + this->pParent->OnIOTick(); } void IOSimpleEventListener::Tick_FrameEpilogue() @@ -43,21 +43,21 @@ namespace Aurora::IO void IOSimpleEventListener::OnFailureCompletion() { - this->parent->OnIOFailure(); + this->pParent->OnIOFailure(); } void IOSimpleEventListener::OnNominalCompletion() { - this->parent->OnIOComplete(); + this->pParent->OnIOComplete(); } - AuSPtr DesimplifyIOEventListenerAdapter(const AuSPtr &interface) + AuSPtr DesimplifyIOEventListenerAdapter(const AuSPtr &pInterface) { - if (!interface) + if (!pInterface) { return {}; } - return AuMakeShared(interface); + return AuMakeShared(pInterface); } } \ No newline at end of file diff --git a/Source/IO/IOSimpleEventListener.hpp b/Source/IO/IOSimpleEventListener.hpp index 4db489d0..8827347c 100644 --- a/Source/IO/IOSimpleEventListener.hpp +++ b/Source/IO/IOSimpleEventListener.hpp @@ -11,7 +11,7 @@ namespace Aurora::IO { struct IOSimpleEventListener : IIOEventListener { - IOSimpleEventListener(const AuSPtr &parent); + IOSimpleEventListener(const AuSPtr &pParent); void Tick_RunOnTick() override; void Tick_OtherIOEvent() override; @@ -21,6 +21,6 @@ namespace Aurora::IO void OnFailureCompletion() override; void OnNominalCompletion() override; - AuSPtr parent; + AuSPtr pParent; }; } \ No newline at end of file diff --git a/Source/IO/Net/AuNetSocketChannelInput.cpp b/Source/IO/Net/AuNetSocketChannelInput.cpp index 44361c63..216d4dfa 100644 --- a/Source/IO/Net/AuNetSocketChannelInput.cpp +++ b/Source/IO/Net/AuNetSocketChannelInput.cpp @@ -77,12 +77,12 @@ namespace Aurora::IO::Net auto sharedThis = AuSPtr(this->pParent_->SharedFromThis(), this); AuIO::IOPipeRequestAIO req; - req.output.handleBufferedStream.onData = AuUnsafeRaiiToShared(this); + req.output.handleBufferedStream.pOnData = AuUnsafeRaiiToShared(this); req.output.type = EPipeCallbackType::eTryHandleBufferedPart; - req.asyncTransaction = this->pNetReadTransaction; - req.isStream = true; - req.listener = sharedThis; - req.bufferLengthOrZero = AuStaticCast(this->pParent_->ToChannel())->uBytesInputBuffer; + req.pAsyncTransaction = this->pNetReadTransaction; + req.bIsStream = true; + req.pListener = sharedThis; + req.uBufferLengthOrZero = AuStaticCast(this->pParent_->ToChannel())->uBytesInputBuffer; this->pNetReader = this->pParent_->ToWorker()->ToProcessor()->ToPipeProcessor()->NewAIOPipe(req); } @@ -99,6 +99,7 @@ namespace Aurora::IO::Net this->pParent_->SendErrorNoStream({}); return; } + this->pNetReadTransaction->SetCallback(sharedThis);//; AuSPtr(this->pParent_->SharedFromThis(), this)); IncrementWorker(); }