diff --git a/Include/Aurora/Async/IWorkItemHandler.hpp b/Include/Aurora/Async/IWorkItemHandler.hpp index 1d3e3f56..87ac4ce3 100644 --- a/Include/Aurora/Async/IWorkItemHandler.hpp +++ b/Include/Aurora/Async/IWorkItemHandler.hpp @@ -40,8 +40,8 @@ namespace Aurora::Async /// A really terrible name for the overloadable method that serves as the critical failure callback /// This may run from any thread - virtual void OnFailure() {}; + inline virtual void OnFailure() {}; - virtual void *GetPrivateData() { return nullptr; } + inline virtual void *GetPrivateData() { return nullptr; } }; } \ No newline at end of file diff --git a/Include/Aurora/Console/ConsoleTTY/IConsoleTTY.hpp b/Include/Aurora/Console/ConsoleTTY/IConsoleTTY.hpp index 05d7a987..b2e59483 100644 --- a/Include/Aurora/Console/ConsoleTTY/IConsoleTTY.hpp +++ b/Include/Aurora/Console/ConsoleTTY/IConsoleTTY.hpp @@ -65,11 +65,11 @@ namespace Aurora::Console::ConsoleTTY virtual bool GetLeftBorder() = 0; virtual bool GetRightBorder() = 0; virtual bool GetBottomBorder() = 0; - virtual bool GetBannerFootBorder() = 0; - + virtual AuUInt8 GetPaddingHeadOfLog(AuUInt8 newValue) = 0; + virtual AuUInt8 GetPaddingTopOfLog(AuUInt8 newValue) = 0; virtual AuUInt8 SetPaddingLeftOfLog(AuUInt8 newValue) = 0; virtual AuUInt8 SetPaddingRightOfLog(AuUInt8 newValue) = 0; virtual AuUInt8 SetPaddingLeftOfInput(AuUInt8 newValue) = 0; @@ -79,7 +79,8 @@ namespace Aurora::Console::ConsoleTTY virtual AuUInt8 SetPaddingMidOfHeader(AuUInt8 newValue) = 0; virtual AuUInt8 SetPaddingBottomOfSubheader(AuUInt8 newValue) = 0; - + virtual AuUInt8 GetPaddingHeadOfLog() = 0; + virtual AuUInt8 GetPaddingTopOfLog() = 0; virtual AuUInt8 GetPaddingLeftOfLog() = 0; virtual AuUInt8 GetPaddingRightOfLog() = 0; virtual AuUInt8 GetPaddingLeftOfInput() = 0; diff --git a/Include/Aurora/IO/EIOProcessorEventStage.hpp b/Include/Aurora/IO/EIOProcessorEventStage.hpp new file mode 100644 index 00000000..973a6324 --- /dev/null +++ b/Include/Aurora/IO/EIOProcessorEventStage.hpp @@ -0,0 +1,19 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: EIOProcessorEventStage.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + AUE_DEFINE(EIOProcessorEventStage, ( + eFrameStartOfFrame, + eFrameYieldIO, + eFrameIODone, + eFrameWorkDone, + eFrameEndOfFrame + )); +} \ No newline at end of file diff --git a/Include/Aurora/IO/IAsyncTransaction.hpp b/Include/Aurora/IO/IAsyncTransaction.hpp index 27844403..f0aadd81 100644 --- a/Include/Aurora/IO/IAsyncTransaction.hpp +++ b/Include/Aurora/IO/IAsyncTransaction.hpp @@ -32,6 +32,18 @@ namespace Aurora::IO */ virtual bool Complete() = 0; + /** + * @brief + * @return + */ + virtual bool Failed() = 0; + + /** + * @brief + * @return + */ + virtual AuUInt GetOSErrorCode() = 0; + /** * @brief Returns the last packets length assuming ::Complete() is true or you are within the registered callback */ diff --git a/Include/Aurora/IO/IByteBufferStreamPair.hpp b/Include/Aurora/IO/IByteBufferStreamPair.hpp new file mode 100644 index 00000000..9560f670 --- /dev/null +++ b/Include/Aurora/IO/IByteBufferStreamPair.hpp @@ -0,0 +1,24 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IByteBufferStreamPair.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct IByteBufferStreamPair + { + virtual AuSPtr ToStreamReader() = 0; + virtual AuSPtr ToSeekingReader() = 0; + virtual AuSPtr ToStreamWriter() = 0; + + virtual AuSPtr ToByteBuffer() = 0; + }; + + AUKN_SYM AuSPtr NewByteBufferPair(); + AUKN_SYM AuSPtr NewByteBufferPairEx(AuUInt length, bool permitResize); + AUKN_SYM AuSPtr NewRingByteBuffer(AuUInt length); +} \ No newline at end of file diff --git a/Include/Aurora/IO/IIOBufferedProcessor.hpp b/Include/Aurora/IO/IIOBufferedProcessor.hpp new file mode 100644 index 00000000..527883de --- /dev/null +++ b/Include/Aurora/IO/IIOBufferedProcessor.hpp @@ -0,0 +1,23 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IIOBufferedProcessor.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct IIOBufferedProcessor + { + virtual AuUInt32 TryProcessBuffered() = 0; + virtual AuUInt32 GetRawBytesBuffered() = 0; + virtual AuUInt32 GetRawBytesLimit() = 0; + }; + + AUKN_SYM AuSPtr NewBufferedProcessor(const AuSPtr &source, + const AuSPtr &processor, + const AuSPtr &drain, + AuUInt32 bufferSize); +} \ No newline at end of file diff --git a/Include/Aurora/IO/IIOEventListener.hpp b/Include/Aurora/IO/IIOEventListener.hpp new file mode 100644 index 00000000..7f7213fd --- /dev/null +++ b/Include/Aurora/IO/IIOEventListener.hpp @@ -0,0 +1,26 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IIOEventListener.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + AUKN_INTERFACE(IIOEventListener, + AUI_METHOD(void, Tick_RunOnTick, ()), // called on each tick, if registered by the iWaitableitem interface + + AUI_METHOD(void, Tick_OtherIOEvent, ()), // called if a different object woke us up + AUI_METHOD(void, Tick_SelfIOEvent, ()), // called if a ticker, io loop source, or other self-iWaitableitem interface poked us + + AUI_METHOD(void, Tick_Any, ()), // always called if any of the above are called + + AUI_METHOD(void, Tick_FrameEpilogue, ()), // called at the end of the tick, once every IOEventListener has had time to fire + + + AUI_METHOD(void, OnFailureCompletion, ()), + AUI_METHOD(void, OnNominalCompletion, ()) + ); +} \ No newline at end of file diff --git a/Include/Aurora/IO/IIOPipeEventListener.hpp b/Include/Aurora/IO/IIOPipeEventListener.hpp new file mode 100644 index 00000000..4eb2b6ab --- /dev/null +++ b/Include/Aurora/IO/IIOPipeEventListener.hpp @@ -0,0 +1,19 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IIOPipeEventListener.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct IOPipeData; + + AUKN_INTERFACE(IIOPipeEventListener, + AUI_METHOD(void, OnPipePartialEvent, (const IOPipeData &, current, AuUInt, tranferred)), + AUI_METHOD(void, OnPipeSuccessEvent, (const IOPipeData &, current)), + AUI_METHOD(void, OnPipeFailureEvent, (const IOPipeData &, current)) + ); +} \ No newline at end of file diff --git a/Include/Aurora/IO/IIOPipeInterceptor.hpp b/Include/Aurora/IO/IIOPipeInterceptor.hpp new file mode 100644 index 00000000..916196db --- /dev/null +++ b/Include/Aurora/IO/IIOPipeInterceptor.hpp @@ -0,0 +1,16 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IIOPipeInterceptor.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + AUKN_INTERFACE(IIOPipeInterceptor, + AUI_METHOD(bool, OnDataAvailable, (const Memory::MemoryViewStreamRead &, view, + const AuSPtr &, out)) + ); +} \ No newline at end of file diff --git a/Include/Aurora/IO/IIOPipeProcessor.hpp b/Include/Aurora/IO/IIOPipeProcessor.hpp new file mode 100644 index 00000000..ad8774e3 --- /dev/null +++ b/Include/Aurora/IO/IIOPipeProcessor.hpp @@ -0,0 +1,72 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IIOPipeProcessor.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct IIOPipeInterceptor; + + struct IOPipeData + { + AuSPtr watchItem; + AuSPtr reader; + AuSPtr writer; + }; + + struct IOPipeRequest + { + /** + * @brief The two streams to join and an invokable object + */ + IOPipeData data; + + /** + * @brief Amount of bytes to transfer + */ + AuUInt32 lengthOrZero {}; + + /** + * @brief event listener + */ + AuSPtr listener; + + /** + * @brief Used as the buffer size for streams of length 0 + */ + AuUInt32 fallbackPageSize {4096 * 50}; + + AuSPtr processor; + }; + + /** + * @brief Different operating systems implement high level stream copy abstraction between network, file, and/or file descriptors. + * This interface connects arbitrary stream objects to one another by piping data under an iprocessor tick; or delegates + * such task to the operating system, if possible. + */ + struct IIOPipeProcessor + { + /** + * @brief + * @param request + * @return + */ + virtual bool BeginPipe(const IOPipeRequest &request) = 0; + + /** + * @brief + * @param itm + */ + virtual void EndByWatch(const AuSPtr &itm) = 0; + + /** + * @brief + * @param itm + */ + virtual void EndByListener(const AuSPtr &itm) = 0; + }; +} \ No newline at end of file diff --git a/Include/Aurora/IO/IIOProcessor.hpp b/Include/Aurora/IO/IIOProcessor.hpp new file mode 100644 index 00000000..bc2e916e --- /dev/null +++ b/Include/Aurora/IO/IIOProcessor.hpp @@ -0,0 +1,46 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IIOProcessor.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct IIOProcessor + { + virtual AuUInt32 TryTick() = 0; + virtual AuUInt32 RunTick() = 0; + virtual AuUInt32 TickFor(const AuSPtr &ioEvent) = 0; + + virtual AuUInt32 ManualTick() = 0; + + virtual AuUInt64 SetRefreshRate(AuUInt64 ns) = 0; + virtual bool HasRefreshRate() = 0; + + virtual bool MultiplexRefreshRateWithIOEvents() = 0; + + virtual AuUInt64 GetOwnedThreadId() = 0; + + virtual AuSPtr StartIOWatch(const AuSPtr &object, const AuSPtr &listener) = 0; + + // Inter-frame callbacks indicating point of execution throughout the [START](optional [yield])[IO][TICK][EPILOGUE][END OF FRAME] frame + virtual bool AddEventListener (const AuSPtr &eventListener) = 0; + virtual void RemoveEventListener(const AuSPtr &eventListener) = 0; + + virtual AuSPtr ToPipeProcessor() = 0; + + // Reference loop queue + virtual AuSPtr ToQueue() = 0; + + virtual void ReleaseAllWatches() = 0; + + virtual bool HasItems() = 0; + }; + + AUKN_SYM AuSPtr NewIOProcessor(bool tickOnly, const AuSPtr &queue); + AUKN_SYM AuSPtr NewIOProcessorOnThread(bool tickOnly, Async::WorkerPId_t id); + AUKN_SYM AuSPtr NewIOProcessorNoQueue(bool tickOnly); +} \ No newline at end of file diff --git a/Include/Aurora/IO/IIOProcessorEventListener.hpp b/Include/Aurora/IO/IIOProcessorEventListener.hpp new file mode 100644 index 00000000..8aa2523c --- /dev/null +++ b/Include/Aurora/IO/IIOProcessorEventListener.hpp @@ -0,0 +1,15 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IIOProcessorEventListener.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + AUKN_INTERFACE(IIOProcessorEventListener, + AUI_METHOD(void, OnIOProcessorStateEvent, (EIOProcessorEventStage, stage)) + ); +} \ No newline at end of file diff --git a/Include/Aurora/IO/IIOProcessorItem.hpp b/Include/Aurora/IO/IIOProcessorItem.hpp new file mode 100644 index 00000000..811f1ff8 --- /dev/null +++ b/Include/Aurora/IO/IIOProcessorItem.hpp @@ -0,0 +1,17 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IIOProcessorItem.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct IIOProcessorItem : IIOProcessorManualInvoker + { + virtual bool StopWatch() = 0; + virtual bool FailWatch() = 0; + }; +} \ No newline at end of file diff --git a/Include/Aurora/IO/IIOProcessorManualInvoker.hpp b/Include/Aurora/IO/IIOProcessorManualInvoker.hpp new file mode 100644 index 00000000..a8c585a6 --- /dev/null +++ b/Include/Aurora/IO/IIOProcessorManualInvoker.hpp @@ -0,0 +1,16 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IIOProcessorManualInvoker.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct IIOProcessorManualInvoker + { + virtual void InvokeManualTick() = 0; + }; +} \ No newline at end of file diff --git a/Include/Aurora/IO/IIOWaitableIOLoopSource.hpp b/Include/Aurora/IO/IIOWaitableIOLoopSource.hpp new file mode 100644 index 00000000..3040c2c8 --- /dev/null +++ b/Include/Aurora/IO/IIOWaitableIOLoopSource.hpp @@ -0,0 +1,19 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IIOWatachableIOLoopSource.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct IIOWatachableIOLoopSource : IIOWaitableItem + { + virtual AuSPtr GetLoopSource() = 0; + virtual AuSPtr SetLoopSource(const AuSPtr &ls) = 0; + }; + + AUKN_SYM AuSPtr NewWaitableLoopSource(const AuSPtr &ptr); +} \ No newline at end of file diff --git a/Include/Aurora/IO/IIOWaitableIOTimer.hpp b/Include/Aurora/IO/IIOWaitableIOTimer.hpp new file mode 100644 index 00000000..ff956d81 --- /dev/null +++ b/Include/Aurora/IO/IIOWaitableIOTimer.hpp @@ -0,0 +1,19 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IIOWaitableIOTimer.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct IIOWaitableIOTimer : IIOWaitableItem + { + virtual AuUInt64 SetConstantTick(AuUInt64 ns) = 0; + virtual AuUInt64 SetTargetTimeAbs(AuUInt64 ns) = 0; + }; + + AUKN_SYM AuSPtr NewWaitableIOTimer(); +} \ No newline at end of file diff --git a/Include/Aurora/IO/IIOWaitableItem.hpp b/Include/Aurora/IO/IIOWaitableItem.hpp new file mode 100644 index 00000000..e046e0f3 --- /dev/null +++ b/Include/Aurora/IO/IIOWaitableItem.hpp @@ -0,0 +1,34 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IIOWaitableItem.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + + // Unstable and hacky interface + // You shouldn't try to implement this yourself + // Defer to IIOWaitableIOLoopSource, IIOWaitableIOTimer, and IIOWaitableTickLimiter for intended impl usage + + AUKN_INTERFACE(IIOWaitableItem, + AUI_METHOD(bool, IsRunOnTick, ()), + + AUI_METHOD(bool, IsRunOnOtherTick, ()), + + AUI_METHOD(bool, CanRequestTick, ()), + AUI_METHOD(void, OnReportPumper, (const AuSPtr &, iface)), + + AUI_METHOD(bool, IsRunOnSelfIO, ()), + AUI_METHOD(AuSPtr, GetSelfIOSource, ()), + AUI_METHOD(bool, IsRunOnSelfIOCheckedOnTimerTick, ()), + + AUI_METHOD(bool, ApplyRateLimit, ()) + ); + + + +} \ No newline at end of file diff --git a/Include/Aurora/IO/IIOWaitableTickLimiter.hpp b/Include/Aurora/IO/IIOWaitableTickLimiter.hpp new file mode 100644 index 00000000..510be4ea --- /dev/null +++ b/Include/Aurora/IO/IIOWaitableTickLimiter.hpp @@ -0,0 +1,20 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IIOWaitableTickLimiter.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct IIOWaitableTickLimiter : IIOWaitableItem + { + virtual AuUInt64 SetConstantTick(AuUInt64 ns) = 0; + virtual AuUInt64 SetMinTime(AuUInt64 ns) = 0; + virtual AuUInt32 SetMinTickDelta(AuUInt32 tickDelta) = 0; + }; + + AUKN_SYM AuSPtr NewWaitableTickLimiter(); +} \ No newline at end of file diff --git a/Include/Aurora/IO/IOAdapterAsyncStream.hpp b/Include/Aurora/IO/IOAdapterAsyncStream.hpp new file mode 100644 index 00000000..fbd9cdb0 --- /dev/null +++ b/Include/Aurora/IO/IOAdapterAsyncStream.hpp @@ -0,0 +1,33 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOAdapterAsyncStream.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct IAsyncStreamAdapater + { + virtual bool SetFlushOnWrite(bool value) = 0; + + virtual void ReserveBuffer(AuUInt length) = 0; + + virtual AuUInt GetReadOffset() = 0; + virtual AuUInt SetReadOffset(AuUInt offset) = 0; + + virtual AuUInt GetWriteOffset() = 0; + virtual AuUInt SetWriteOffset(AuUInt offset) = 0; + + virtual AuSPtr ToStreamReader() = 0; + virtual AuSPtr ToStreamWriter() = 0; + + virtual AuSPtr ToWaitable() = 0; + + virtual bool Reset() = 0; + }; + + AUKN_SYM AuSPtr NewAsyncStreamAdapter(const AuSPtr &transaction, bool isStream); +} \ No newline at end of file diff --git a/Include/Aurora/IO/IOAdapterByteBuffer.hpp b/Include/Aurora/IO/IOAdapterByteBuffer.hpp new file mode 100644 index 00000000..09d816a8 --- /dev/null +++ b/Include/Aurora/IO/IOAdapterByteBuffer.hpp @@ -0,0 +1,15 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOAdapterByteBuffer.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + AUKN_SYM AuSPtr NewByteBufferReadAdapter(const AuSPtr &buffer); + AUKN_SYM AuSPtr NewByteBufferLinearSeekableAdapter(const AuSPtr &buffer); + AUKN_SYM AuSPtr NewByteBufferWriteAdapter(const AuSPtr &buffer); +} \ No newline at end of file diff --git a/Include/Aurora/IO/IOAdapterCompression.hpp b/Include/Aurora/IO/IOAdapterCompression.hpp new file mode 100644 index 00000000..480e220b --- /dev/null +++ b/Include/Aurora/IO/IOAdapterCompression.hpp @@ -0,0 +1,14 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOAdapterCompression.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + AUKN_SYM AuSPtr NewCompressionReadAdapter(const AuSPtr &compresionStream); + AUKN_SYM AuSPtr NewCompressionSeekingAdapter(const AuSPtr &compresionStream); +} \ No newline at end of file diff --git a/Include/Aurora/IO/IOAdapterSeeking.hpp b/Include/Aurora/IO/IOAdapterSeeking.hpp new file mode 100644 index 00000000..d2626967 --- /dev/null +++ b/Include/Aurora/IO/IOAdapterSeeking.hpp @@ -0,0 +1,13 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOAdapterCompression.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + AUKN_SYM AuSPtr NewSeekingReadAdapter(const AuSPtr &reader); +} \ No newline at end of file diff --git a/Include/Aurora/IO/IOExperimental.hpp b/Include/Aurora/IO/IOExperimental.hpp new file mode 100644 index 00000000..b801f7b5 --- /dev/null +++ b/Include/Aurora/IO/IOExperimental.hpp @@ -0,0 +1,41 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOExperimental.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +#include "IIOProcessorManualInvoker.hpp" + +#include "IIOWaitableItem.hpp" +#include "IIOEventListener.hpp" + +#include "EIOProcessorEventStage.hpp" +#include "IIOPipeEventListener.hpp" + +#include "IIOPipeProcessor.hpp" + +#include "IIOProcessorEventListener.hpp" +#include "IIOProcessorItem.hpp" +#include "IIOProcessor.hpp" + +//#include "IBufferedWriterToReader.hpp" +//#include "IBufferedReadProcessorToReader.hpp" + +#include "IIOPipeInterceptor.hpp" +#include "IIOBufferedProcessor.hpp" + +#include "IByteBufferStreamPair.hpp" + +#include "IOAdapterAsyncStream.hpp" +#include "IOAdapterByteBuffer.hpp" +#include "IOAdapterCompression.hpp" +#include "IOAdapterSeeking.hpp" + +#include "IIOWaitableTickLimiter.hpp" +#include "IIOWaitableIOTimer.hpp" +#include "IIOWaitableIOLoopSource.hpp" + +#include "IOPipeInterceptorNop.hpp" \ No newline at end of file diff --git a/Include/Aurora/IO/IOPipeInterceptorNop.hpp b/Include/Aurora/IO/IOPipeInterceptorNop.hpp new file mode 100644 index 00000000..8dfe7b54 --- /dev/null +++ b/Include/Aurora/IO/IOPipeInterceptorNop.hpp @@ -0,0 +1,18 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IIOPipeInterceptor.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + AUE_DEFINE(EPipeNopStrategy, ( + eBuffered, + eWriteAll + )); + + AuSPtr NewPipeInterceptorNopCopier(EPipeNopStrategy mode); +} \ No newline at end of file diff --git a/Include/Aurora/IO/IOSleep.hpp b/Include/Aurora/IO/IOSleep.hpp index 7099f74b..fa8be86f 100644 --- a/Include/Aurora/IO/IOSleep.hpp +++ b/Include/Aurora/IO/IOSleep.hpp @@ -15,4 +15,6 @@ namespace Aurora::IO * @return true on preemption, false on timeout */ AUKN_SYM bool WaitFor(AuUInt32 milliseconds, bool waitEntireFrame = true); + + AUKN_SYM bool IOYield(); } \ No newline at end of file diff --git a/README.md b/README.md index b9a4e8cd..182af2ac 100644 --- a/README.md +++ b/README.md @@ -308,7 +308,7 @@ hacks we have available (outside of ripping the compiler apart to emit special d experiment worth trying now that modern hardware can make up for software and microcode flaws; and architecture translation. Most users probably wont even notice the performance loss, until it saves them from a hard crash and they realize dereferences are bloated. This is default behaviour, and can be easily disabled or configured from within your ecosystem's AuroraConfiguration.h to globally -modify behaviour and subsequent ABI of the AuSPtr's. +modify the behaviour and subsequent ABI of the AuSPtr's. ``` diff --git a/Source/Async/ThreadPool.cpp b/Source/Async/ThreadPool.cpp index 47d73242..41255eb4 100644 --- a/Source/Async/ThreadPool.cpp +++ b/Source/Async/ThreadPool.cpp @@ -19,6 +19,7 @@ namespace Aurora::Async static thread_local AuWPtr gCurrentPool; static const auto kMagicResortThreshold = 15; + static thread_local int tlsCallStack; inline auto GetWorkerInternal(const AuSPtr &pool) { if (pool.get() == AuStaticCast(gAsyncApp)) @@ -134,6 +135,7 @@ namespace Aurora::Async if (!AuTryInsert(state->workQueue, AuMakePair(target.second, runnable))) { + DecrementTasksRunning(); runnable->CancelAsync(); return; } @@ -248,7 +250,6 @@ namespace Aurora::Async } bool success {}; - auto runMode = GetCurrentThreadRunMode(); do @@ -461,6 +462,22 @@ namespace Aurora::Async int start = state->cookie; + // Account for + // while (AuAsync.GetCurrentPool()->runForever()); + // in the first task (or deeper) + if (InRunnerMode() && tlsCallStack) // are we one call deep? + { + auto queue = ToKernelWorkQueue(); + + if ((this->tasksRunning_ == tlsCallStack) && + (!queue || queue->GetSourceCount() <= 1)) + { + return false; + } + } + + // + for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); ) { if (state->threadObject->Exiting() || state->shuttingdown) @@ -510,12 +527,16 @@ namespace Aurora::Async // Remove from our local job queue itr = state->pendingWorkItems.erase(itr); + tlsCallStack++; + // Dispatch oops->RunAsync(); // Atomically decrement global task counter runningTasks = this->tasksRunning_.fetch_sub(1) - 1; + tlsCallStack--; + if (start != state->cookie) { start = state->cookie; @@ -523,7 +544,6 @@ namespace Aurora::Async } } - gCurrentPool = oldTlsHandle; // Return popped work back to the groups work pool when our -pump loops were preempted @@ -538,9 +558,17 @@ namespace Aurora::Async CtxPollReturn(state, magic, true); + + // Account for + // while (AuAsync.GetCurrentPool()->runForever()); + // in the top most task if (InRunnerMode()) { - if (runningTasks == 0) + auto queue = ToKernelWorkQueue(); + + if ((runningTasks == 0) && + (this->tasksRunning_ == 0 ) && + (!queue || queue->GetSourceCount() <= 1)) { Shutdown(); } @@ -760,7 +788,9 @@ namespace Aurora::Async feature->Init(); })); + auto workItem = this->NewWorkItem(id, work, !async)->Dispatch(); + SysAssert(workItem); if (!async) { diff --git a/Source/Console/ConsoleTTY/ConsoleTTY.cpp b/Source/Console/ConsoleTTY/ConsoleTTY.cpp index 22444b70..63a2973f 100644 --- a/Source/Console/ConsoleTTY/ConsoleTTY.cpp +++ b/Source/Console/ConsoleTTY/ConsoleTTY.cpp @@ -1873,6 +1873,25 @@ namespace Aurora::Console::ConsoleTTY return this->bottomSubHeaderPadding; } + AuUInt8 TTYConsole::GetPaddingHeadOfLog(AuUInt8 newValue) + { + return AuExchange(this->topLogPadding, newValue); + } + + AuUInt8 TTYConsole::GetPaddingTopOfLog(AuUInt8 newValue) + { + return AuExchange(this->topLogPaddingExtra, newValue); + } + + AuUInt8 TTYConsole::GetPaddingHeadOfLog() + { + return this->topLogPadding; + } + + AuUInt8 TTYConsole::GetPaddingTopOfLog() + { + return this->topLogPaddingExtra; + } void Init() { diff --git a/Source/Console/ConsoleTTY/ConsoleTTY.hpp b/Source/Console/ConsoleTTY/ConsoleTTY.hpp index 31b30ee2..02a64cb7 100644 --- a/Source/Console/ConsoleTTY/ConsoleTTY.hpp +++ b/Source/Console/ConsoleTTY/ConsoleTTY.hpp @@ -128,26 +128,30 @@ namespace Aurora::Console::ConsoleTTY virtual bool SetLeftBorder(bool newValue) override; virtual bool SetRightBorder(bool newValue) override; - AuUInt8 SetPaddingLeftOfLog(AuUInt8 newValue) override; - AuUInt8 SetPaddingRightOfLog(AuUInt8 newValue) override; - AuUInt8 SetPaddingLeftOfInput(AuUInt8 newValue) override; - AuUInt8 SetPaddingTopOfInput(AuUInt8 newValue) override; - AuUInt8 SetPaddingTopOfHint(AuUInt8 newValue) override; - AuUInt8 SetPaddingTopOfHeader(AuUInt8 newValue) override; - AuUInt8 SetPaddingMidOfHeader(AuUInt8 newValue) override; - AuUInt8 SetPaddingBottomOfSubheader(AuUInt8 newValue) override; - - - AuUInt8 GetPaddingLeftOfLog() override; - AuUInt8 GetPaddingRightOfLog() override; - AuUInt8 GetPaddingLeftOfInput() override; - AuUInt8 GetPaddingTopOfInput() override; - AuUInt8 GetPaddingTopOfHint() override; - AuUInt8 GetPaddingTopOfHeader() override; - AuUInt8 GetPaddingMidOfHeader() override; - AuUInt8 GetPaddingBottomOfSubheader() override; + virtual AuUInt8 SetPaddingLeftOfLog(AuUInt8 newValue) override; + virtual AuUInt8 SetPaddingRightOfLog(AuUInt8 newValue) override; + virtual AuUInt8 SetPaddingLeftOfInput(AuUInt8 newValue) override; + virtual AuUInt8 SetPaddingTopOfInput(AuUInt8 newValue) override; + virtual AuUInt8 SetPaddingTopOfHint(AuUInt8 newValue) override; + virtual AuUInt8 SetPaddingTopOfHeader(AuUInt8 newValue) override; + virtual AuUInt8 SetPaddingMidOfHeader(AuUInt8 newValue) override; + virtual AuUInt8 SetPaddingBottomOfSubheader(AuUInt8 newValue) override; + + virtual AuUInt8 GetPaddingLeftOfLog() override; + virtual AuUInt8 GetPaddingRightOfLog() override; + virtual AuUInt8 GetPaddingLeftOfInput() override; + virtual AuUInt8 GetPaddingTopOfInput() override; + virtual AuUInt8 GetPaddingTopOfHint() override; + virtual AuUInt8 GetPaddingTopOfHeader() override; + virtual AuUInt8 GetPaddingMidOfHeader() override; + virtual AuUInt8 GetPaddingBottomOfSubheader() override; + virtual AuUInt8 GetPaddingHeadOfLog(AuUInt8 newValue) override; + virtual AuUInt8 GetPaddingTopOfLog(AuUInt8 newValue) override; + virtual AuUInt8 GetPaddingHeadOfLog() override; + virtual AuUInt8 GetPaddingTopOfLog() override; + void WriteBuffered(AuPair pos, const AuString &in); void WriteLine(int Y, const AuString &in); void BlankLine(int Y, bool borders = true); @@ -168,7 +172,6 @@ namespace Aurora::Console::ConsoleTTY int oldWidth {}; int oldHeight {}; - int iHistoryPos {-1}; int iHistoryWritePos {0}; AuList history; @@ -235,8 +238,8 @@ namespace Aurora::Console::ConsoleTTY int leftLogPadding {1}; int rightLogPadding {1}; - int topLogPadding {0}; // extra padding, if displaying header - int topLogPaddingExtra {1}; // this is backwards + int topLogPadding {0}; // extra padding, if displaying header ... + int topLogPaddingExtra {1}; // this is backwards ... int leftInputPadding {0}; int topInputPadding {0}; diff --git a/Source/HWInfo/CpuInfo.Linux.cpp b/Source/HWInfo/CpuInfo.Linux.cpp index a5e46189..dc7ad6a8 100644 --- a/Source/HWInfo/CpuInfo.Linux.cpp +++ b/Source/HWInfo/CpuInfo.Linux.cpp @@ -21,13 +21,17 @@ namespace Aurora::HWInfo AuString contents; if (AuIOFS::ReadString(path, contents)) { - char *endPtr; - auto word = strtoll(contents.c_str(), &endPtr, 10); - if (errno == ERANGE) return 0; - return word; + return 0; } - return 0; + char *endPtr; + auto word = strtoll(contents.c_str(), &endPtr, 10); + if (errno == ERANGE) + { + return 0; + } + + return word; } static AuString ReadString(const AuString &path) @@ -79,7 +83,7 @@ namespace Aurora::HWInfo auto cpuId = CpuBitId(threadId); auto coreID = ReadUInt(kBasePath + coreStr + "/topology/core_id"); - auto cpuList = ReadString(kBasePath + coreStr +"/topology/core_cpus_list"); + auto cpuList = ReadString(kBasePath + coreStr + "/topology/core_cpus_list"); auto isHVCore = AuStringContains(cpuList, ","); diff --git a/Source/IO/ByteBufferStreamPair.cpp b/Source/IO/ByteBufferStreamPair.cpp new file mode 100644 index 00000000..5aee7444 --- /dev/null +++ b/Source/IO/ByteBufferStreamPair.cpp @@ -0,0 +1,84 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: ByteBufferStreamPair.cpp + Date: 2022-6-6 + Author: Reece +***/ +#include +#include +#include "ByteBufferStreamPair.hpp" + +namespace Aurora::IO +{ + ByteBufferStreamPair::ByteBufferStreamPair(const AuSPtr &buffer) : + buffer(buffer), + reader(buffer), + writer(buffer), + seeker(buffer) + { + + } + + AuSPtr ByteBufferStreamPair::ToStreamReader() + { + return AuSPtr(AuSharedFromThis(), AuStaticCast(&this->reader)); + } + + AuSPtr ByteBufferStreamPair::ToSeekingReader() + { + if (this->buffer->flagCircular) + { + SysPushErrorIO("ring buffer must not be seekable"); + return {}; + } + + return AuSPtr(AuSharedFromThis(), AuStaticCast(&this->seeker)); + } + + AuSPtr ByteBufferStreamPair::ToStreamWriter() + { + return AuSPtr(AuSharedFromThis(), AuStaticCast(&this->writer)); + } + + AuSPtr ByteBufferStreamPair::ToByteBuffer() + { + return this->buffer; + } + + AUKN_SYM AuSPtr NewByteBufferPair() + { + auto scalable = AuMakeShared(); + if (!scalable) + { + SysPushErrorMem(); + return {}; + } + + return AuMakeShared(scalable); + } + + AUKN_SYM AuSPtr NewByteBufferPairEx(AuUInt length, bool permitResize) + { + auto buffer = AuMakeShared(length, false, permitResize); + if (!buffer) + { + SysPushErrorMem(); + return {}; + } + + return AuMakeShared(buffer); + } + + AUKN_SYM AuSPtr NewRingByteBuffer(AuUInt length) + { + auto buffer = AuMakeShared(length, false, false); + if (!buffer) + { + SysPushErrorMem(); + return {}; + } + + return AuMakeShared(buffer); + } +} \ No newline at end of file diff --git a/Source/IO/ByteBufferStreamPair.hpp b/Source/IO/ByteBufferStreamPair.hpp new file mode 100644 index 00000000..75506f14 --- /dev/null +++ b/Source/IO/ByteBufferStreamPair.hpp @@ -0,0 +1,27 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: ByteBufferStreamPair.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct ByteBufferStreamPair : IByteBufferStreamPair, AuEnableSharedFromThis + { + ByteBufferStreamPair(const AuSPtr &buffer); + + AuSPtr ToStreamReader() override; + AuSPtr ToSeekingReader() override; + AuSPtr ToStreamWriter() override; + + AuSPtr ToByteBuffer() override; + + AuSPtr buffer; + Buffered::BlobReader reader; + Buffered::BlobWriter writer; + Buffered::BlobSeekableReader seeker; + }; +} \ No newline at end of file diff --git a/Source/IO/FS/Async.NT.cpp b/Source/IO/FS/Async.NT.cpp index 7001c463..49bc4239 100644 --- a/Source/IO/FS/Async.NT.cpp +++ b/Source/IO/FS/Async.NT.cpp @@ -240,7 +240,9 @@ namespace Aurora::IO::FS { that->pin_.reset(); that->Reset(); - SysPushErrorFIO("Async FIO error: {} {}", that->GetFileHandle()->path, GetLastError()); + that->osErrorCode = GetLastError(); + that->hasFailed = true; + SysPushErrorFIO("QoA async FIO error: {} {}", that->GetFileHandle()->path, that->osErrorCode); return false; } } @@ -253,6 +255,13 @@ namespace Aurora::IO::FS { auto transaction = reinterpret_cast(reinterpret_cast(lpOverlapped) - offsetof(NtAsyncFileTransaction, overlap_)); auto hold = AuExchange(transaction->pin_, {}); + + if (dwErrorCode) + { + hold->hasFailed = true; + hold->osErrorCode = dwErrorCode; + } + hold->Complete(); } @@ -289,6 +298,7 @@ namespace Aurora::IO::FS ::ResetEvent(this->event_); this->memoryHold_ = memoryView; + this->hasFailed = false; this->lastAbstractStat_ = memoryView->length; this->lastAbstractOffset_ = offset; @@ -310,6 +320,8 @@ namespace Aurora::IO::FS this->latch_ = false; ::ResetEvent(this->event_); + this->hasFailed = false; + this->memoryHold_ = memoryView; this->lastAbstractStat_ = memoryView->length; @@ -340,12 +352,23 @@ namespace Aurora::IO::FS void NtAsyncFileTransaction::Reset() { ::ResetEvent(this->event_); + this->hasFailed = false; this->lastAbstractStat_ = 0; // do not use latch } + bool NtAsyncFileTransaction::Failed() + { + return this->hasFailed; + } + + AuUInt NtAsyncFileTransaction::GetOSErrorCode() + { + return Failed() ? this->osErrorCode : ERROR_SUCCESS; + } + bool NtAsyncFileTransaction::Complete() { - DWORD read; + DWORD read {}; if (!this->lastAbstractStat_) { @@ -357,7 +380,8 @@ namespace Aurora::IO::FS return false; } - if (GetOverlappedResult(this->handle_->handle, &this->overlap_, &read, false)) + if ((this->hasFailed) || + ::GetOverlappedResult(this->handle_->handle, &this->overlap_, &read, false)) { bool bLatched = this->latch_; DispatchCb(read); @@ -370,7 +394,12 @@ namespace Aurora::IO::FS AuUInt32 NtAsyncFileTransaction::GetLastPacketLength() { DWORD read {}; - GetOverlappedResult(this->handle_->handle, &this->overlap_, &read, false); + + if (!::GetOverlappedResult(this->handle_->handle, &this->overlap_, &read, false)) + { + return {}; + } + return read; } @@ -386,7 +415,7 @@ namespace Aurora::IO::FS return true; } - auto ret = WaitForSingleObjectEx(this->event_, timeout ? timeout : INFINITE, true); + auto ret = ::WaitForSingleObjectEx(this->event_, timeout ? timeout : INFINITE, true); if (ret == WAIT_OBJECT_0) { return Complete(); @@ -438,7 +467,7 @@ namespace Aurora::IO::FS return {}; } - stream->Init(fileHandle); + stream->Init(fileHandle); return stream; } diff --git a/Source/IO/FS/Async.NT.hpp b/Source/IO/FS/Async.NT.hpp index 68617038..cba84688 100644 --- a/Source/IO/FS/Async.NT.hpp +++ b/Source/IO/FS/Async.NT.hpp @@ -47,7 +47,11 @@ namespace Aurora::IO::FS bool StartRead(AuUInt64 offset, const AuSPtr &memoryView) override; bool StartWrite(AuUInt64 offset, const AuSPtr &memoryView) override; - bool Complete() override; + bool Complete() override; + + bool Failed() override; + AuUInt GetOSErrorCode() override; + AuUInt32 GetLastPacketLength() override; void SetCallback(const AuSPtr &sub) override; @@ -70,6 +74,9 @@ namespace Aurora::IO::FS AuUInt32 lastAbstractStat_ {}, lastAbstractOffset_ {}; bool latch_ {}; + AuUInt32 osErrorCode {}; + bool hasFailed {}; + private: AuSPtr memoryHold_; AuSPtr handle_; diff --git a/Source/IO/IOAdapterAsyncStream.cpp b/Source/IO/IOAdapterAsyncStream.cpp new file mode 100644 index 00000000..466c699a --- /dev/null +++ b/Source/IO/IOAdapterAsyncStream.cpp @@ -0,0 +1,545 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOAdapterAsyncStream.cpp + Date: 2022-6-6 + Author: Reece +***/ +#include +#include +#include "IOAdapterAsyncStream.hpp" +#include "IOWaitableIOLoopSource.hpp" + +namespace Aurora::IO +{ + struct AsyncStreamAdapater; + + struct AsyncStreamReader : IStreamReader + { + AsyncStreamAdapater *parent; + + EStreamError IsOpen() override; + EStreamError Read(const Memory::MemoryViewStreamWrite ¶meters) override; + void Close() override; + }; + + struct AsyncStreamMemory : AuMemoryViewWrite + { + AsyncStreamMemory(AuUInt length); + ~AsyncStreamMemory(); + + bool IsValid(); + + AuUInt streamIndex {}; + }; + + struct AsyncStreamWriter : IStreamWriter + { + AsyncStreamAdapater *parent; + ~AsyncStreamWriter(); + + EStreamError IsOpen() override; + EStreamError Write(const Memory::MemoryViewStreamRead ¶meters) override; + void Close() override; + void Flush() override; + + void Preframe(); + void Frame(); + + AuList> writesPending; + bool HasWorkItems(); + }; + + struct AsyncStreamAdapater : IAsyncStreamAdapater, AuEnableSharedFromThis + { + AsyncStreamAdapater(); + + AuSPtr AllocateNextPageCached(AuUInt length); + + virtual AuSPtr ToStreamReader() override; + virtual AuSPtr ToStreamWriter() override; + + virtual AuSPtr ToWaitable() override; + + virtual bool Reset() override; + + bool Init(const AuSPtr &transaction, bool isStream); + + AuSPtr lastAllocation; + AuSPtr transaction; + + bool SetFlushOnWrite(bool value) override; + + void ReserveBuffer(AuUInt length) override; + + AuUInt GetReadOffset(); + AuUInt SetReadOffset(AuUInt offset); + + AuUInt GetWriteOffset(); + AuUInt SetWriteOffset(AuUInt offset); + + bool asyncActive {}; + AuUInt readOffset {}; + AuUInt writeOffset {}; + bool isStream {}; + + bool flushOnWrite {true}; + + AuOptionalEx errorCode; + int locked {}; + + AsyncStreamReader reader; + AsyncStreamWriter writer; + IOWatachableIOLoopSource source; + }; + + + AsyncStreamMemory::AsyncStreamMemory(AuUInt length) : AuMemoryViewWrite(AuMemory::ZAlloc(length), length) + { + + } + + AsyncStreamMemory::~AsyncStreamMemory() + { + if (this->ptr) + { + AuMemory::Free(this->ptr); + this->ptr = nullptr; + } + } + + bool AsyncStreamMemory::IsValid() + { + return bool(this->ptr); + } + + bool AsyncStreamAdapater::SetFlushOnWrite(bool value) + { + return AuExchange(this->flushOnWrite, value); + } + + AuUInt AsyncStreamAdapater::GetReadOffset() + { + return this->readOffset; + } + + AuUInt AsyncStreamAdapater::SetReadOffset(AuUInt offset) + { + if (this->locked == 1) + { + this->writer.Preframe(); + } + return AuExchange(this->readOffset, offset); + } + + AuUInt AsyncStreamAdapater::GetWriteOffset() + { + if (this->locked == 1) + { + this->writer.Preframe(); + } + return this->writeOffset; + } + + AuUInt AsyncStreamAdapater::SetWriteOffset(AuUInt offset) + { + return AuExchange(this->writeOffset, offset); + } + + bool AsyncStreamAdapater::Init(const AuSPtr &transaction, bool isStream) + { + this->transaction = transaction; + this->lastAllocation.reset(); + this->asyncActive = false; + this->reader.parent = this; + this->writer.parent = this; + return true; + } + + AsyncStreamAdapater::AsyncStreamAdapater() : source({}) + { + + } + + AuSPtr AsyncStreamAdapater::AllocateNextPageCached(AuUInt length) + { + if (this->lastAllocation) + { + if (this->lastAllocation->length >= length) + { + return this->lastAllocation; + } + } + + auto newMem = AuMakeShared(length); + if (!newMem) + { + SysPushErrorMem(); + return {}; + } + + if (!newMem->IsValid()) + { + SysPushErrorMem(); + return {}; + } + + return this->lastAllocation = newMem; + } + + EStreamError AsyncStreamReader::IsOpen() + { + return this->parent->errorCode.HasValue() ? + this->parent->errorCode.value() : + EStreamError::eErrorNone; + } + + EStreamError AsyncStreamReader::Read(const Memory::MemoryViewStreamWrite ¶meters) + { + if (!parameters.length) + { + SysPushErrorArg(); + return EStreamError::eErrorEndOfStream; + } + + // Read from the last tranaction, if not fully consumed + if (parent->lastAllocation) + { + auto length = parent->transaction->GetLastPacketLength(); + if (length && + parent->lastAllocation->streamIndex != length) + { + auto toRead = AuMin(parameters.length, length - parent->lastAllocation->streamIndex); + if (toRead) + { + if (parameters.ptr) + { + AuMemcpy(parameters.ptr, parent->lastAllocation->Begin() + parent->lastAllocation->streamIndex, toRead); + + if (parent->isStream) + { + parent->lastAllocation->streamIndex += toRead; + } + else + { + parent->lastAllocation->streamIndex += length; + } + } + } + + if (parent->isStream) + { + parent->lastAllocation->streamIndex += length; + } + + parameters.outVariable = toRead; + return EStreamError::eErrorNone; + } + + if (parent->transaction && parent->transaction->Failed()) + { + SysPushErrorIO("AIO transaction read failed: {}", parent->transaction->GetOSErrorCode()); + parent->errorCode = EStreamError::eErrorStreamInterrupted; + parent->lastAllocation.reset(); + } + } + + // Async error + if (parent->errorCode.HasValue()) + { + auto code = parent->isStream ? + parent->errorCode.Value() : + AuExchange(parent->errorCode, {}).Value(); + + if (code != EStreamError::eErrorNone) + { + return code; + } + } + + // Async awaiting response + if (parent->asyncActive && !parent->transaction->Complete()) + { + parameters.outVariable = 0; + return EStreamError::eErrorNone; + } + + // Async success or blank state + parent->transaction->Reset(); + parent->asyncActive = true; + + parent->lastAllocation = parent->AllocateNextPageCached(parameters.length); + parent->lastAllocation->streamIndex = 0; + + if (!parent->transaction->StartRead(parent->isStream ? 0 : parent->readOffset, parent->lastAllocation)) + { + parent->asyncActive = false; + SysPushErrorNested("Couldn't start async aio read"); + return EStreamError::eErrorStreamInterrupted; + } + + return EStreamError::eErrorNone; + } + + void AsyncStreamReader::Close() + { + + } + + AsyncStreamWriter::~AsyncStreamWriter() + { + Flush(); + } + + EStreamError AsyncStreamWriter::IsOpen() + { + return this->parent->errorCode.HasValue() ? + this->parent->errorCode.value() : + EStreamError::eErrorNone; + } + + EStreamError AsyncStreamWriter::Write(const Memory::MemoryViewStreamRead ¶meters) + { + if (!parameters.ptr) + { + return EStreamError::eErrorStreamInterrupted; + } + + Preframe(); + + if (parent->errorCode.HasValue()) + { + auto code = parent->isStream ? + parent->errorCode.Value() : + AuExchange(parent->errorCode, {}).Value(); + + if (code != EStreamError::eErrorNone) + { + return code; + } + } + + auto newMem = AuMakeShared(parameters.length); + if (!newMem) + { + SysPushErrorMem(); + return EStreamError::eErrorStreamInterrupted; + } + + if (!newMem->IsValid()) + { + SysPushErrorMem(); + return EStreamError::eErrorStreamInterrupted; + } + + AuMemcpy(newMem->ptr, parameters.ptr, parameters.length); + parameters.outVariable = parameters.length; + + if (!AuTryInsert(this->writesPending, newMem)) + { + SysPushErrorMem(); + return EStreamError::eErrorStreamInterrupted; + } + + if (this->parent->flushOnWrite) + { + Frame(); + } + + return EStreamError::eErrorNone; + } + + void AsyncStreamWriter::Flush() + { + Preframe(); + Frame(); + } + + void AsyncStreamWriter::Close() + { + Flush(); + } + + void AsyncStreamWriter::Preframe() + { + if (parent->transaction->Complete()) + { + if (parent->transaction->Failed()) + { + SysPushErrorIO("AIO transaction write failed: {}", parent->transaction->GetOSErrorCode()); + parent->errorCode = EStreamError::eErrorStreamInterrupted; + parent->lastAllocation.reset(); + } + else + { + parent->transaction->GetLastPacketLength(); + } + + parent->transaction->Reset(); + } + } + + void AsyncStreamWriter::Frame() + { + AuSPtr buffer; + + if (this->writesPending.size() == 1) + { + buffer = AuMove(this->writesPending[0]); + } + else + { + AuUInt length {}; + + for (auto &a : this->writesPending) + { + length += a->length; + } + + buffer = this->parent->AllocateNextPageCached(length); + if (!buffer) + { + return; + } + + AuUInt index {}; + + for (auto &a : this->writesPending) + { + if (a->length + index > buffer->length) + { + SysPanic(""); + } + + AuMemcpy(buffer->Begin() + index, a->ptr, a->length); + index += a->length; + } + + //.... + } + + // Async success or blank state + parent->transaction->Reset(); + parent->asyncActive = true; + + struct WriteMem : AuMemoryViewRead + { + AuSPtr write; + }; + + auto annoying = AuMakeShared(); + if (!annoying) + { + SysPushErrorMem(); + return; + } + + annoying->write = buffer; + annoying->ptr = buffer->ptr; + annoying->length = buffer->length; + + parent->lastAllocation = buffer; + parent->lastAllocation->streamIndex = 0; + + if (!parent->transaction->StartWrite(parent->isStream ? 0 : parent->writeOffset, annoying)) + { + parent->asyncActive = false; + SysPushErrorNested("Couldn't start async aio write"); + return; + } + + this->writesPending.clear(); + } + + bool AsyncStreamWriter::HasWorkItems() + { + return this->writesPending.size(); + } + + AuSPtr AsyncStreamAdapater::ToStreamReader() + { + if (this->locked != 0 && this->locked != 2) + { + return {}; + } + + this->locked = 2; + + return AuSPtr(AuSharedFromThis(), &this->reader); + } + + AuSPtr AsyncStreamAdapater::ToStreamWriter() + { + if (this->locked != 0 && this->locked != 1) + { + return {}; + } + + this->locked = 1; + + return AuSPtr(AuSharedFromThis(), &this->writer); + } + + AuSPtr AsyncStreamAdapater::ToWaitable() + { + this->source.SetLoopSource(this->transaction->NewLoopSource()); + return AuSPtr(AuSharedFromThis(), &this->source); + } + + void AsyncStreamAdapater::ReserveBuffer(AuUInt length) + { + if (!this->lastAllocation || !this->asyncActive) + { + this->lastAllocation = this->AllocateNextPageCached(length); + } + } + + bool AsyncStreamAdapater::Reset() + { + if (this->locked == 1) + { + if (this->writer.HasWorkItems()) + { + return false; + } + + this->writer.Flush(); + } + + if (this->asyncActive) + { + if (!this->transaction->Complete()) + { + return false; + } + } + + this->locked = 0; + this->transaction->Reset(); + this->writeOffset = 0; + this->readOffset = 0; + return true; + } + + AUKN_SYM AuSPtr NewAsyncStreamAdapter(const AuSPtr &transaction, bool isStream) + { + if (!transaction) + { + SysPushErrorArg(); + return {}; + } + + auto adapter = AuMakeShared(); + if (!adapter) + { + return {}; + } + + if (!adapter->Init(transaction, isStream)) + { + return {}; + } + + return adapter; + } +} \ No newline at end of file diff --git a/Source/IO/IOAdapterAsyncStream.hpp b/Source/IO/IOAdapterAsyncStream.hpp new file mode 100644 index 00000000..3e3df376 --- /dev/null +++ b/Source/IO/IOAdapterAsyncStream.hpp @@ -0,0 +1,13 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOAdapterAsyncStream.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + +} \ No newline at end of file diff --git a/Source/IO/IOAdapterByteBuffer.cpp b/Source/IO/IOAdapterByteBuffer.cpp new file mode 100644 index 00000000..08c43aa4 --- /dev/null +++ b/Source/IO/IOAdapterByteBuffer.cpp @@ -0,0 +1,52 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOAdapterByteBuffer.cpp + Date: 2022-6-6 + Author: Reece +***/ +#include +#include +#include "IOAdapterByteBuffer.hpp" + +namespace Aurora::IO +{ + AUKN_SYM AuSPtr NewByteBufferReadAdapter(const AuSPtr &buffer) + { + if (!buffer) + { + SysPushErrorArg(); + return {}; + } + + return AuMakeShared(buffer); + } + + AUKN_SYM AuSPtr NewByteBufferLinearSeekableAdapter(const AuSPtr &buffer) + { + if (!buffer) + { + SysPushErrorArg(); + return {}; + } + + if (buffer->flagCircular) + { + SysPushErrorIO("Seekable buffer must not be circular"); + return {}; + } + + return AuMakeShared(buffer); + } + + AUKN_SYM AuSPtr NewByteBufferWriteAdapter(const AuSPtr &buffer) + { + if (!buffer) + { + SysPushErrorArg(); + return {}; + } + + return AuMakeShared(buffer); + } +} \ No newline at end of file diff --git a/Source/IO/IOAdapterByteBuffer.hpp b/Source/IO/IOAdapterByteBuffer.hpp new file mode 100644 index 00000000..4c34ebf8 --- /dev/null +++ b/Source/IO/IOAdapterByteBuffer.hpp @@ -0,0 +1,13 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOAdapterByteBuffer.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + +} \ No newline at end of file diff --git a/Source/IO/IOAdapterCompression.cpp b/Source/IO/IOAdapterCompression.cpp new file mode 100644 index 00000000..9f82bcb7 --- /dev/null +++ b/Source/IO/IOAdapterCompression.cpp @@ -0,0 +1,103 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOAdapterCompression.cpp + Date: 2022-6-6 + Author: Reece +***/ +#include +#include +#include "IOAdapterCompression.hpp" + +namespace Aurora::IO +{ + CompressionStreamReader::CompressionStreamReader(const AuSPtr &compressionStream) : compressionStream(compressionStream) + { + + } + + EStreamError CompressionStreamReader::IsOpen() + { + return this->errored_ ? EStreamError::eErrorEndOfStream : EStreamError::eErrorNone; + } + + EStreamError CompressionStreamReader::Read(const Memory::MemoryViewStreamWrite ¶meters) + { + auto pair = this->compressionStream->ReadEx(parameters, true); + if (pair == AuStreamReadWrittenPair_t{}) + { + this->errored_ = true; + return EStreamError::eErrorEndOfStream; + } + + parameters.outVariable = pair.second; + return EStreamError::eErrorNone; + } + + void CompressionStreamReader::Close() + { + } + + CompressionSeekingReader::CompressionSeekingReader(const AuSPtr &compressionStream) : compressionStream(compressionStream) + { + + } + + EStreamError CompressionSeekingReader::IsOpen() + { + return this->errored_ ? + EStreamError::eErrorEndOfStream : + EStreamError::eErrorNone; + } + + EStreamError CompressionSeekingReader::ArbitraryRead(AuUInt offset, const Memory::MemoryViewStreamWrite ¶meters) + { + if (this->offset != offset) + { + if (this->offset > offset) + { + if (!this->compressionStream->GoBackByProcessedN(this->offset - offset)) + { + SysPushErrorIO("Negative compression seek out of bounds"); + return EStreamError::eErrorStreamInterrupted; + } + } + else + { + if (!this->compressionStream->GoForwardByProcessedN(offset - this->offset)) + { + SysPushErrorIO("Positive compression seek out of bounds"); + return EStreamError::eErrorStreamInterrupted; + } + } + + this->offset = offset; + } + + auto pair = this->compressionStream->ReadEx(parameters, true); + if (pair == AuStreamReadWrittenPair_t {}) + { + this->errored_ = true; + return EStreamError::eErrorEndOfStream; + } + + this->offset += pair.second; + parameters.outVariable = pair.second; + + return EStreamError::eErrorNone; + } + + void CompressionSeekingReader::Close() + { + } + + AUKN_SYM AuSPtr NewCompressionReadAdapter(const AuSPtr &compresionStream) + { + return AuMakeShared(compresionStream); + } + + AUKN_SYM AuSPtr NewCompressionSeekingAdapter(const AuSPtr &compresionStream) + { + return AuMakeShared(compresionStream); + } +} \ No newline at end of file diff --git a/Source/IO/IOAdapterCompression.hpp b/Source/IO/IOAdapterCompression.hpp new file mode 100644 index 00000000..31731a12 --- /dev/null +++ b/Source/IO/IOAdapterCompression.hpp @@ -0,0 +1,38 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOAdapterCompression.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct CompressionStreamReader : IStreamReader + { + AuSPtr compressionStream; + + CompressionStreamReader(const AuSPtr &compressionStream); + + EStreamError IsOpen() override; + EStreamError Read(const Memory::MemoryViewStreamWrite ¶meters) override; + void Close() override; + + bool errored_ {}; + }; + + struct CompressionSeekingReader : ISeekingReader + { + AuSPtr compressionStream; + + CompressionSeekingReader(const AuSPtr &compressionStream); + + EStreamError IsOpen() override; + EStreamError ArbitraryRead(AuUInt offset, const Memory::MemoryViewStreamWrite ¶meters) override; + void Close() override; + + bool errored_ {}; + AuUInt offset {}; + }; +} \ No newline at end of file diff --git a/Source/IO/IOAdapterSeeking.cpp b/Source/IO/IOAdapterSeeking.cpp new file mode 100644 index 00000000..bdfa4e63 --- /dev/null +++ b/Source/IO/IOAdapterSeeking.cpp @@ -0,0 +1,46 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOAdapterSeeking.cpp + Date: 2022-6-6 + Author: Reece +***/ +#include +#include +#include "IOAdapterSeeking.hpp" + +namespace Aurora::IO +{ + SeekingReader::SeekingReader(const AuSPtr &reader) : reader(reader) + { + + } + + EStreamError SeekingReader::IsOpen() + { + return this->reader ? this->reader->IsOpen() : EStreamError::eErrorStreamNotOpen; + } + + EStreamError SeekingReader::Read(const Memory::MemoryViewStreamWrite ¶meters) + { + auto error = this->reader ? this->reader->ArbitraryRead(this->index, parameters) : EStreamError::eErrorStreamNotOpen; + if (error == EStreamError::eErrorNone) + { + this->index += parameters.outVariable; + } + return error; + } + + void SeekingReader::Close() + { + if (this->reader) + { + this->reader->Close(); + } + } + + AUKN_SYM AuSPtr NewSeekingReadAdapter(const AuSPtr &reader) + { + return AuMakeShared(reader); + } +} \ No newline at end of file diff --git a/Source/IO/IOAdapterSeeking.hpp b/Source/IO/IOAdapterSeeking.hpp new file mode 100644 index 00000000..e54f55f5 --- /dev/null +++ b/Source/IO/IOAdapterSeeking.hpp @@ -0,0 +1,23 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOAdapterSeeking.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct SeekingReader : IStreamReader + { + AuSPtr reader; + AuUInt index {}; + + SeekingReader(const AuSPtr &reader); + + virtual EStreamError IsOpen() override; + virtual EStreamError Read(const Memory::MemoryViewStreamWrite ¶meters) override; + virtual void Close() override; + }; +} \ No newline at end of file diff --git a/Source/IO/IOBufferedProcessor.cpp b/Source/IO/IOBufferedProcessor.cpp new file mode 100644 index 00000000..ea544652 --- /dev/null +++ b/Source/IO/IOBufferedProcessor.cpp @@ -0,0 +1,130 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOBufferedProcessor.cpp + Date: 2022-6-6 + Author: Reece +***/ +#include +#include +#include "IOBufferedProcessor.hpp" + +namespace Aurora::IO +{ + struct IOBufferedProcessor : IIOBufferedProcessor + { + AuSPtr source; + AuSPtr drain; + AuSPtr processor; + AuUInt32 bufferSize {}; + AuByteBuffer buffer; + + AU_DEFINE_FOR_VA(IOBufferedProcessor, + (AU_DEFINE_CTOR_VA, // initializer-list-like ctor (extending a struct or adding a ctor will break initializer lists) + AU_DEFINE_THIS_MOVE_CTOR_VA, // add move `Object(Object &&)` + AU_DEFINE_EQUALS_VA, // add equals operator + AU_DEFINE_MOVE_VA, // add move assignment operator + AU_DEFINE_COPY_VA), // add copy assignment operator + (source, drain, processor, bufferSize)); + + + AuUInt32 TryProcessBuffered() override; + AuUInt32 GetRawBytesBuffered() override; + AuUInt32 GetRawBytesLimit() override; + + AuUInt32 TryPump(); + }; + + AuUInt32 IOBufferedProcessor::TryProcessBuffered() + { + if (this->buffer.IsEmpty()) + { + this->buffer.Allocate(this->bufferSize); + this->buffer.flagCircular = true; // !!! + } + + if (this->buffer.IsEmpty()) + { + return TryPump(); + } + + AuUInt canBuffer = this->buffer.RemainingWrite(); + canBuffer = AuMin(canBuffer, AuUInt((this->buffer.length + this->buffer.base) - this->buffer.writePtr)); + + AuUInt read {}; + try + { + if (this->source->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer.writePtr, canBuffer), read)) != + AuIO::EStreamError::eErrorNone) + { + return TryPump(); + } + } + catch (...) + { + SysPushErrorCatch(); + } + + this->buffer.writePtr += read; + + if (this->buffer.writePtr == this->buffer.base + this->buffer.length) + { + this->buffer.writePtr = this->buffer.base; + } + + return TryPump(); + } + + AuUInt32 IOBufferedProcessor::TryPump() + { + AuUInt bytesProcessedTotal {}; + AuUInt bytesProcessed {}; + + do + { + AuUInt canRead = this->buffer.RemainingBytes(); + canRead = AuMin(canRead, (this->buffer.length + this->buffer.base) - this->buffer.readPtr); + + try + { + if (!this->processor->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer.readPtr, canRead), bytesProcessed), this->drain)) + { + break; + } + } + catch (...) + { + SysPushErrorCatch(); + } + + this->buffer.readPtr += bytesProcessed; + bytesProcessedTotal += bytesProcessed; + + if (this->buffer.readPtr == this->buffer.base + this->buffer.length) + { + this->buffer.readPtr = this->buffer.base; + } + } + while (AuExchange(bytesProcessed, 0)); + + return bytesProcessedTotal; + } + + AuUInt32 IOBufferedProcessor::GetRawBytesBuffered() + { + return this->buffer.RemainingBytes(); + } + + AuUInt32 IOBufferedProcessor::GetRawBytesLimit() + { + return this->bufferSize; + } + + AUKN_SYM AuSPtr NewBufferedProcessor(const AuSPtr &source, + const AuSPtr &processor, + const AuSPtr &drain, + AuUInt32 bufferSize) + { + return AuMakeShared(source, drain, processor, bufferSize); + } +} \ No newline at end of file diff --git a/Source/IO/IOBufferedProcessor.hpp b/Source/IO/IOBufferedProcessor.hpp new file mode 100644 index 00000000..0862eff0 --- /dev/null +++ b/Source/IO/IOBufferedProcessor.hpp @@ -0,0 +1,13 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: BufferedProcessor.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + +} \ No newline at end of file diff --git a/Source/IO/IOPipeProcessor.cpp b/Source/IO/IOPipeProcessor.cpp new file mode 100644 index 00000000..8b9aa5f4 --- /dev/null +++ b/Source/IO/IOPipeProcessor.cpp @@ -0,0 +1,15 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOPipeProcessor.cpp + Date: 2022-6-6 + Author: Reece +***/ +#include +#include +#include "IOPipeProcessor.hpp" + +namespace Aurora::IO +{ + +} \ No newline at end of file diff --git a/Source/IO/IOPipeProcessor.hpp b/Source/IO/IOPipeProcessor.hpp new file mode 100644 index 00000000..6e40d22d --- /dev/null +++ b/Source/IO/IOPipeProcessor.hpp @@ -0,0 +1,13 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOPipeProcessor.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + +} \ No newline at end of file diff --git a/Source/IO/IOProcessor.cpp b/Source/IO/IOProcessor.cpp new file mode 100644 index 00000000..62acf76e --- /dev/null +++ b/Source/IO/IOProcessor.cpp @@ -0,0 +1,763 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOProcessor.cpp + Date: 2022-6-6 + Author: Reece +***/ +#include +#include +#include "IOProcessorItem.hpp" +#include "IOProcessorItems.hpp" +#include "IOProcessor.hpp" + +namespace Aurora::IO +{ + IOProcessor::IOProcessor(AuUInt threadId, bool tickOnly, + AuAsync::WorkerPId_t worker, + const AuSPtr &loop) : mutliplexIOAndTimer(!tickOnly), loopQueue(loop), asyncWorker(worker), threadId(threadId) + { + + } + + IOProcessor::~IOProcessor() + { + this->ReleaseAllWatches(); + } + + bool IOProcessor::Init() + { + if (!this->items.Init()) + { + SysPushErrorNested(); + return false; + } + + if (!this->timers.Init(this, AuLoop::NewLSTimer(0))) + { + SysPushErrorNested(); + return false; + } + + return true; + } + + bool IOProcessor::QueueIOEvent(const AuSPtr &ioEvent) + { + return this->items.AddFrameOrFallback(AuStaticCast(ioEvent)); + } + + AuUInt32 IOProcessor::TryTick() + { + if (!CheckThread()) + { + SysPushErrorGeneric("Wrong Thread"); + return 0; + } + + AU_LOCK_GUARD(this->items.mutex); + FrameStart(); + FrameRunThreadIO(); + FrameRunCheckLSes(); + + bool bHasAlertedItems = this->items.workSignaled.size() || + this->timers.nbTicker.HasPassed(); + + if (!bHasAlertedItems) + { + ReportState(EIOProcessorEventStage::eFrameEndOfFrame); + return 0; + } + + return FrameRunEpilogue(); + } + + bool IOProcessor::TickForRegister(const AuSPtr &ioEvent) + { + return this->items.AddFrameOrFallback(AuStaticCast(ioEvent)); + } + + AuUInt32 IOProcessor::TickFor(const AuSPtr &ioEvent) + { + SysAssert(TickForRegister(ioEvent)); + + if (IsTickOnly()) + { + return 0; + } + + if (this->bFrameStart) + { + return 0; + } + + if (!CheckThread()) + { + SysPushErrorGeneric("Wrong Thread"); + return 0; + } + + + AU_LOCK_GUARD(this->items.mutex); + FrameStart(); + FrameRunThreadIO(); + FrameRunCheckLSes(); + return FrameRunEpilogue(); + } + + AuUInt32 IOProcessor::RunTick() + { + if (!CheckThread()) + { + SysPushErrorGeneric("Wrong Thread"); + AuThreading::Sleep(1); + return 0; + } + + this->bFrameStart = false; + + AU_LOCK_GUARD(this->items.mutex); + FrameStart(); + FrameWaitForAny(0); + FrameRunThreadIO(); + FrameRunCheckLSes(); + return FrameRunEpilogue(); + } + + AuUInt32 IOProcessor::ManualTick() + { + if (!CheckThread()) + { + SysPushErrorGeneric("Wrong Thread"); + return 0; + } + + this->bFrameStart = false; + + AU_LOCK_GUARD(this->items.mutex); + FrameStart(); + FrameRunThreadIO(); + FrameRunCheckLSes(); + return FrameRunEpilogue(); + } + + AuUInt IOProcessor::FrameRunEpilogue() + { + FrameRunAlerted(); + FrameRunAlertedSniffers(); + FrameEndOfFrameEvents(); + return FrameFinalize(); + } + + void IOProcessor::FrameRunCheckLSes() + { + if (!this->IsTickOnly()) + { + return; + } + + for (auto &item : this->items.allItems) + { + if (item->item->IsRunOnSelfIO() && item->item->IsRunOnSelfIOCheckedOnTimerTick()) + { + auto ls = item->item->GetSelfIOSource(); + if (ls && ls->IsSignaled()) + { + // Should deadlock without critical sections + this->QueueIOEvent(item); + } + } + } + } + + void IOProcessor::FrameRunAlerted() + { + for (auto &a : this->items.onTickReceivers) + { + if (a->listener) + { + try + { + a->listener->Tick_RunOnTick(); + + if (!AuExists(this->items.workSignaled, a) && + !AuExists(this->items.onTickReceivers, a)) + { + a->listener->Tick_Any(); + } + } + catch (...) + { + SysPushErrorCatch(); + } + } + + SysAssert(AuTryInsert(this->items.finalizeQueue, a)); + } + + for (auto &a : this->items.workSignaled) + { + if (a->listener) + { + try + { + a->listener->Tick_SelfIOEvent(); + a->listener->Tick_Any(); + } + catch (...) + { + SysPushErrorCatch(); + } + } + + SysAssert(AuTryInsert(this->items.finalizeQueue, a)); + } + } + + void IOProcessor::FrameRunAlertedSniffers() + { + if (this->items.workSignaled.empty()) + { + return; + } + + for (auto &a : this->items.onOtherReceivers) + { + if (AuExists(this->items.workSignaled, a)) + { + continue; + } + + if (a->listener) + { + try + { + a->listener->Tick_OtherIOEvent(); + a->listener->Tick_Any(); + } + catch (...) + { + SysPushErrorCatch(); + + SysAssert(a->FailWatch()); + } + } + + this->items.finalizeQueue.push_back(a); + } + } + + void IOProcessor::FrameEndOfFrameEvents() + { + ReportState(EIOProcessorEventStage::eFrameWorkDone); + + for (auto &pumped : this->items.finalizeQueue) + { + if (!pumped->listener) + { + continue; + } + + try + { + pumped->listener->Tick_FrameEpilogue(); + } + catch (...) + { + SysPushErrorCatch(); + pumped->FailWatch(); + } + } + + for (auto itr = this->items.crossThreadAbort.begin(); itr != this->items.crossThreadAbort.end(); ) + { + bool fatal = itr->second; + auto item = itr->first; + + + this->ClearProcessor(item, fatal); + + itr = this->items.crossThreadAbort.erase(itr); + } + } + + + void IOProcessor::ClearProcessor(const AuSPtr &processor, bool fatal) + { + if (!AuTryRemove(this->items.allItems, processor)) + { + return; + } + + try + { + if (fatal) + { + processor->listener->OnFailureCompletion(); + } + else + { + processor->listener->OnNominalCompletion(); + } + } + catch (...) + { + SysPushErrorCatch(); + } + + auto item = processor->item; + + if (item->IsRunOnTick()) + { + if (!AuTryRemove(this->items.onTickReceivers, processor)) + { + SysPushErrorMem(); + } + } + + if (item->IsRunOnOtherTick()) + { + if (!AuTryRemove(this->items.onOtherReceivers, processor)) + { + SysPushErrorMem(); + } + } + + if (item->IsRunOnSelfIO()) + { + if (!AuTryRemove(this->items.registeredIO, processor)) + { + SysPushErrorMem(); + } + + auto src = item->GetSelfIOSource(); + if (!src) + { + SysPushErrorGeneric(); + return; + } + + if (!this->ToQueue()->SourceRemove(src)) + { + SysPushErrorNested("IO Remove Error [!!!]"); + } + } + } + + AuUInt IOProcessor::FrameFinalize() + { + auto C = this->items.finalizeQueue.size(); + this->items.workSignaled.clear(); + this->items.finalizeQueue.clear(); + + ReportState(EIOProcessorEventStage::eFrameEndOfFrame); + this->bFrameStart = false; + return C; + } + + bool IOProcessor::InternalIsTickReady() + { + // TODO: tickless io ad-hoc event-as-condvar? + return this->items.workSignaled.size() || + this->items.crossThreadAbort.size(); + } + + bool IOProcessor::FrameWaitForAny(AuUInt32 msMax) + { + if (InternalIsTickReady()) + { + return true; + } + + if (this->IsTickOnly()) + { + auto next = this->timers.nbTicker.nextTriggerTime; + auto now = AuTime::CurrentInternalClockNS(); + + if (now >= next) + { + return true; + } + + auto delay = AuNSToMS(next - now); + + #if 0 + if (delay == 1) + { + bool ret {}; + while (IO::IOYield()); + } + else + #endif + { + return IO::WaitFor(msMax ? AuMin(msMax, delay) : delay, true); + } + } + else + { + return this->loopQueue->WaitAny(msMax); + } + } + + bool IOProcessor::AddEventListener(const AuSPtr &eventListener) + { + AU_LOCK_GUARD(this->listenersSpinLock); + + return AuTryInsert(this->listeners, eventListener); + } + + void IOProcessor::RemoveEventListener(const AuSPtr &eventListener) + { + AU_LOCK_GUARD(this->listenersSpinLock); + + AuTryRemove(this->listeners, eventListener); + } + + void IOProcessor::DispatchFrame(ProcessInfo &info) + { + this->ManualTick(); + + auto ns = this->refreshRateNs; + if (ns) + { + info = AuAsync::ETickType::eSchedule; + info.reschedNs = ns; + } + } + + void IOProcessor::FrameStart() + { + ReportState(EIOProcessorEventStage::eFrameStartOfFrame); + this->bFrameStart = true; + + auto blocked = this->items.GetBlockedSignals(); + if (blocked.size()) + { + AU_LOCK_GUARD(this->items.mutex); + this->items.workSignaled.insert(this->items.workSignaled.end(), blocked.begin(), blocked.end()); + } + } + + void IOProcessor::FrameRunThreadIO() + { + ReportState(EIOProcessorEventStage::eFrameYieldIO); + while (IO::IOYield()); + ReportState(EIOProcessorEventStage::eFrameIODone); + } + + AuUInt64 IOProcessor::SetRefreshRate(AuUInt64 ns) + { + if (!CheckThread()) + { + AU_THROW_STRING("Wrong Thread"); + } + + bool changed = bool(ns) != bool(this->refreshRateNs); + + auto old = AuExchange(this->refreshRateNs, ns); + + UpdateTimers(); + + if (changed) + { + if (ns) + { + AddTimer(); + } + else + { + RemoveTimer(); + } + } + + return old; + } + + bool IOProcessor::HasRefreshRate() + { + return bool(this->refreshRateNs); + } + + bool IOProcessor::MultiplexRefreshRateWithIOEvents() + { + return this->mutliplexIOAndTimer; + } + + AuUInt64 IOProcessor::GetOwnedThreadId() + { + return this->threadId; + } + + void IOProcessor::UpdateTimers() + { + this->timers.lsTicker->UpdateTickRateIfAnyNs(this->refreshRateNs); + this->timers.nbTicker.nsTimeStep = this->refreshRateNs; + + if (!this->timers.nbTicker.nextTriggerTime) + { + this->timers.nbTicker.nextTriggerTime = AuTime::CurrentInternalClockNS() + this->timers.nbTicker.nsTimeStep; + } + } + + void IOProcessor::AddTimer() + { + if (IsAsync()) + { + StartAsyncTimerIfAny(); + } + else + { + AddTimerLS(); + } + } + + void IOProcessor::AddTimerLS() + { + this->ToQueue()->SourceAdd(this->timers.lsTicker); + this->ToQueue()->AddCallback(this->timers.lsTicker, AuSPtr(AuSharedFromThis(), &this->timers)); + this->ToQueue()->Commit(); + } + + void IOProcessor::StartAsyncTimerIfAny() + { + if (!this->workItem) + { + this->workItem = this->asyncWorker.pool->NewWorkItem(this->asyncWorker, AuSharedFromThis()); + } + + this->workItem->SetSchedTimeNs(this->refreshRateNs); + + this->workItem->Dispatch(); + } + + void IOProcessor::RemoveTimer() + { + if (IsAsync()) + { + CancelWorkItem(); + } + else + { + RemoveLSTimer(); + } + } + + void IOProcessor::CancelWorkItem() + { + this->workItem->Cancel(); + this->workItem = {}; + } + + void IOProcessor::RemoveLSTimer() + { + this->ToQueue()->SourceRemove(this->timers.lsTicker); + } + + bool IOProcessor::IsAsync() + { + return bool(this->asyncWorker.pool); + } + + bool IOProcessor::IsTickOnly() + { + return !this->mutliplexIOAndTimer && + this->timers.nbTicker.nextTriggerTime; + } + + bool IOProcessor::RequestRemovalForItemFromAnyThread(const AuSPtr &processor) + { + return {}; + } + + AuSPtr IOProcessor::StartIOWatch(const AuSPtr &object, const AuSPtr &listener) + { + if (!CheckThread()) + { + AU_THROW_STRING("Wrong Thread"); + } + + auto item = AuMakeShared(); + item->parent = this; + item->listener = listener; + item->item = object; + + AU_LOCK_GUARD(this->items.mutex); + + this->items.allItems.push_back(item); + + if (object->IsRunOnTick()) + { + if (!AuTryInsert(this->items.onTickReceivers, item)) + { + SysPushErrorMem(); + return {}; + } + } + + if (object->IsRunOnOtherTick()) + { + if (!AuTryInsert(this->items.onOtherReceivers, item)) + { + SysPushErrorMem(); + return {}; + } + } + + if (object->IsRunOnSelfIO()) + { + auto src = object->GetSelfIOSource(); + if (!src) + { + SysPushErrorGeneric(); + return {}; + } + + if (!this->ToQueue()->SourceAdd(src)) + { + SysPushErrorNested("IO Error"); + return {}; + } + + if (!this->ToQueue()->AddCallback(src, item)) + { + SysPushErrorNested("IO Error"); + SysAssert(this->ToQueue()->SourceRemove(src)); + return {}; + } + + if (!this->ToQueue()->Commit()) + { + SysPushErrorIO("midframe recommit"); + } + + if (!AuTryInsert(this->items.registeredIO, item)) + { + SysPushErrorMem(); + SysAssert(this->ToQueue()->SourceRemove(src)); + return {}; + } + } + + return item; + } + + AuSPtr IOProcessor::ToPipeProcessor() + { + if (!CheckThread()) + { + AU_THROW_STRING("Wrong Thread"); + } + + return {}; + } + + AuSPtr IOProcessor::ToQueue() + { + return this->loopQueue; + } + + void IOProcessor::ReleaseAllWatches() + { + + } + + bool IOProcessor::HasItems() + { + return bool(this->items.allItems.size()); + } + + void IOProcessor::ReportState(EIOProcessorEventStage stage) + { + AU_LOCK_GUARD(this->listenersSpinLock); + + for (auto &listeners : this->listeners) + { + try + { + listeners->OnIOProcessorStateEvent(stage); + } + catch (...) + { + SysPushErrorCatch(); + } + } + } + + bool IOProcessor::CheckThread() + { + if (this->threadId == 0) + { + this->threadId = AuThreads::GetThreadId(); + return true; + } + + return this->threadId == AuThreads::GetThreadId(); + } + + AUKN_SYM AuSPtr NewIOProcessor(bool tickOnly, const AuSPtr &queue) + { + auto processor = AuMakeShared(0, tickOnly, AuAsync::WorkerPId_t {}, queue); + if (!processor) + { + SysPushErrorMem(); + return {}; + } + + if (!processor->Init()) + { + SysPushErrorNested(); + return {}; + } + + return processor; + } + + AUKN_SYM AuSPtr NewIOProcessorOnThread(bool tickOnly, Async::WorkerPId_t id) + { + if (!id.pool) + { + return {}; + } + + auto thread = id.pool->ResolveHandle(id); + if (!thread) + { + SysPushErrorGeneric("Worker PID failed to resolve to a thread object"); + return {}; + } + + auto queue = id.pool->ToKernelWorkQueue(id); + if (!queue) + { + SysPushErrorGeneric("Worker PID has no kernel work queue"); + return {}; + } + + auto processor = AuMakeShared(AuUInt(thread.get()), tickOnly, id, queue); + if (!processor) + { + SysPushErrorMem(); + return {}; + } + + if (!processor->Init()) + { + SysPushErrorNested(); + return {}; + } + + return processor; + } + + AUKN_SYM AuSPtr NewIOProcessorNoQueue(bool tickOnly) + { + auto loop = AuLoop::NewLoopQueue(); + if (!loop) + { + SysPushErrorMem(); + return {}; + } + + return NewIOProcessor(tickOnly, loop); + } +} \ No newline at end of file diff --git a/Source/IO/IOProcessor.hpp b/Source/IO/IOProcessor.hpp new file mode 100644 index 00000000..f5785229 --- /dev/null +++ b/Source/IO/IOProcessor.hpp @@ -0,0 +1,107 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOProcessor.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +#include "IOProcessorItems.hpp" +#include "IOProcessorTimers.hpp" + +namespace Aurora::IO +{ + struct IOProcessor : IIOProcessor, AuEnableSharedFromThis, AuAsync::IWorkItemHandler + { + ~IOProcessor(); + IOProcessor(AuUInt threadId, bool tickOnly, AuAsync::WorkerPId_t worker, const AuSPtr &loop); + + bool Init(); + + bool QueueIOEvent(const AuSPtr &ioEvent); + + AuUInt32 TryTick() override; + AuUInt32 RunTick() override; + AuUInt32 TickFor(const AuSPtr &ioEvent); + bool TickForRegister(const AuSPtr &ioEvent); + + AuUInt32 ManualTick() override; + + void DispatchFrame(ProcessInfo &info) override; + + bool AddEventListener(const AuSPtr &eventListener) override; + void RemoveEventListener(const AuSPtr &eventListener) override; + + void FrameStart(); + bool FrameWaitForAny(AuUInt32 msMax); + AuUInt FrameRunEpilogue(); + void FrameRunThreadIO(); + void FrameRunCheckLSes(); + void FrameRunAlerted(); + void FrameRunAlertedSniffers(); + void FrameEndOfFrameEvents(); + AuUInt FrameFinalize(); + + + bool InternalIsTickReady(); + + void ClearProcessor(const AuSPtr &processor, bool fatal); + + void ReportState(EIOProcessorEventStage stage); + + AuUInt64 SetRefreshRate(AuUInt64 ns) override; + bool HasRefreshRate() override; + + AuUInt64 GetOwnedThreadId() override; + + bool MultiplexRefreshRateWithIOEvents() override; + + AuSPtr StartIOWatch(const AuSPtr &object, const AuSPtr &listener) override; + + AuSPtr ToPipeProcessor() override; + + AuSPtr ToQueue() override; + void ReleaseAllWatches() override; + + bool HasItems() override; + + bool CheckThread(); + + bool RequestRemovalForItemFromAnyThread(const AuSPtr &processor); + + AuSPtr loopQueue; + + AuUInt threadId; + + AuUInt64 refreshRateNs {}; + AuUInt64 minFrameDeltaNs {}; + + void UpdateTimers(); + + void AddTimer(); + void AddTimerLS(); + void StartAsyncTimerIfAny(); + void RemoveTimer(); + + void CancelWorkItem(); + void RemoveLSTimer(); + + bool IsTickOnly(); + bool IsAsync(); + + bool mutliplexIOAndTimer {true}; + + IOProcessorItems items; + IOProcessorTimers timers; + AuAsync::WorkerPId_t asyncWorker; + AuSPtr workItem; + + AuThreadPrimitives::MutexUnique_t mutex; + + AuThreadPrimitives::SpinLock listenersSpinLock; + AuList> listeners; + + bool bFrameStart {}; + }; +} \ No newline at end of file diff --git a/Source/IO/IOProcessorItem.cpp b/Source/IO/IOProcessorItem.cpp new file mode 100644 index 00000000..75219624 --- /dev/null +++ b/Source/IO/IOProcessorItem.cpp @@ -0,0 +1,76 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOProcessorItem.cpp + Date: 2022-6-6 + Author: Reece +***/ +#include +#include +#include "IOProcessorItem.hpp" +#include "IOProcessor.hpp" + +namespace Aurora::IO +{ + bool IOProcessorItem::StopWatch() + { + if (this->parent->CheckThread() && + this->parent->bFrameStart) + { + this->parent->ClearProcessor(AuSharedFromThis(), false); + return false; + } + + return this->parent->items.ScheduleFinish(AuSharedFromThis(), false); + } + + bool IOProcessorItem::FailWatch() + { + if (this->parent->CheckThread() && + this->parent->bFrameStart) + { + this->parent->ClearProcessor(AuSharedFromThis(), true); + return false; + } + + return this->parent->items.ScheduleFinish(AuSharedFromThis(), true); + } + + bool IOProcessorItem::OnFinished(const AuSPtr &source) + { + IOAlert(false); + return false; + } + + void IOProcessorItem::InvokeManualTick() + { + IOAlert(true); + } + + void IOProcessorItem::IOAlert(bool force) + { + if (!this->parent->IsTickOnly()) + { + // We *should* be mid-yield in the frame + // We register the item for the later stages of the IO frame + this->parent->TickForRegister(AuSharedFromThis()); + } + else if (force) // manual tick + { + + if (this->parent->CheckThread()) + { + this->parent->TickFor(AuSharedFromThis()); + this->parent->ManualTick(); + } + else if (parent->IsAsync()) + { + // TODO: + } + else + { + SysPanic("Missing"); + } + } + } +} \ No newline at end of file diff --git a/Source/IO/IOProcessorItem.hpp b/Source/IO/IOProcessorItem.hpp new file mode 100644 index 00000000..5615db72 --- /dev/null +++ b/Source/IO/IOProcessorItem.hpp @@ -0,0 +1,35 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOProcessorItem.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct IOProcessor; + + struct IOProcessorItem : AuLoop::ILoopSourceSubscriber, IIOProcessorItem, AuEnableSharedFromThis + { + IOProcessor *parent; + + AuSPtr item; + AuSPtr listener; + + // ILoopSourceSubscriber + bool OnFinished(const AuSPtr &source) override; + + // IIOProcessorItem + virtual bool StopWatch() override; + virtual bool FailWatch() override; + + // IIOProcessorItem::... + void InvokeManualTick() override; + + // + void IOAlert(bool force); + }; + +} \ No newline at end of file diff --git a/Source/IO/IOProcessorItems.cpp b/Source/IO/IOProcessorItems.cpp new file mode 100644 index 00000000..bf657485 --- /dev/null +++ b/Source/IO/IOProcessorItems.cpp @@ -0,0 +1,51 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOProcessorItems.cpp + Date: 2022-6-6 + Author: Reece +***/ +#include +#include +#include "IOProcessorItem.hpp" +#include "IOProcessorItems.hpp" + +namespace Aurora::IO +{ + bool IOProcessorItems::Init() + { + this->mutex = AuThreadPrimitives::CriticalSectionUnique(); + this->mutex2 = AuThreadPrimitives::CriticalSectionUnique(); + + return bool(this->mutex) && bool(this->mutex2); + } + + bool IOProcessorItems::AddFrameTemp(const AuSPtr &item) + { + AU_TRY_LOCK_GUARD_RET_DEF(this->mutex); + return AuTryInsert(this->workSignaled, item); + } + + bool IOProcessorItems::AddFrameOrFallback(const AuSPtr &item) + { + if (!AddFrameTemp(item)) + { + AU_LOCK_GUARD(this->mutex2); + return AuTryInsert(this->workSignaled2, item); + } + + return true; + } + + bool IOProcessorItems::ScheduleFinish(const AuSPtr &item, bool unsafe) + { + AU_TRY_LOCK_GUARD_RET_DEF(this->mutex); + return AuTryInsert(this->crossThreadAbort, AuMakePair(item, unsafe)); + } + + AuList> IOProcessorItems::GetBlockedSignals() + { + AU_LOCK_GUARD(this->mutex2); + return AuExchange(this->workSignaled2, {}); + } +} \ No newline at end of file diff --git a/Source/IO/IOProcessorItems.hpp b/Source/IO/IOProcessorItems.hpp new file mode 100644 index 00000000..5eee303d --- /dev/null +++ b/Source/IO/IOProcessorItems.hpp @@ -0,0 +1,40 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOProcessorItems.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct IOProcessor; + struct IOProcessorItem; + + struct IOProcessorItems + { + AuList> allItems; + AuList> onTickReceivers; + AuList> onOtherReceivers; + + AuList> registeredIO; + + AuThreadPrimitives::CriticalSectionUnique_t mutex; + AuList> workSignaled; + AuThreadPrimitives::CriticalSectionUnique_t mutex2; + AuList> workSignaled2; + + AuList> finalizeQueue; + AuList, bool>> crossThreadAbort; + + bool Init(); + + bool AddFrameTemp(const AuSPtr &item); + bool AddFrameOrFallback(const AuSPtr &item); + + bool ScheduleFinish(const AuSPtr &item, bool unsafe); + + AuList> GetBlockedSignals(); + }; +} \ No newline at end of file diff --git a/Source/IO/IOProcessorTimers.cpp b/Source/IO/IOProcessorTimers.cpp new file mode 100644 index 00000000..244a8746 --- /dev/null +++ b/Source/IO/IOProcessorTimers.cpp @@ -0,0 +1,27 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOProcessorTimers.cpp + Date: 2022-6-6 + Author: Reece +***/ +#include +#include +#include "IOProcessorTimers.hpp" +#include "IOProcessor.hpp" + +namespace Aurora::IO +{ + bool IOProcessorTimers::Init(IOProcessor *parent, AuSPtr lsTicker) + { + this->parent = parent; + this->lsTicker = lsTicker; + return bool(this->parent) && bool(lsTicker); + } + + bool IOProcessorTimers::OnFinished(const AuSPtr &source) + { + this->parent->ManualTick(); + return false; + } +} \ No newline at end of file diff --git a/Source/IO/IOProcessorTimers.hpp b/Source/IO/IOProcessorTimers.hpp new file mode 100644 index 00000000..4c8fc9ee --- /dev/null +++ b/Source/IO/IOProcessorTimers.hpp @@ -0,0 +1,24 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOProcessorTimers.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct IOProcessor; + + struct IOProcessorTimers : AuLoop::ILoopSourceSubscriber + { + IOProcessor *parent {}; + AuSPtr lsTicker; + Utility::RateLimiter nbTicker; + + bool Init(IOProcessor *parent, AuSPtr lsTicker); + + bool OnFinished(const AuSPtr &source) override; + }; +} \ No newline at end of file diff --git a/Source/IO/IOSleep.Linux.cpp b/Source/IO/IOSleep.Linux.cpp index 0972e4c2..a46865f4 100644 --- a/Source/IO/IOSleep.Linux.cpp +++ b/Source/IO/IOSleep.Linux.cpp @@ -41,7 +41,7 @@ namespace Aurora::IO sleep = 0; } - if (!UNIX::LinuxOverlappedPoll(sleep, true)) + if (!UNIX::LinuxOverlappedPoll(sleep)) { continue; } diff --git a/Source/IO/IOSleep.NT.cpp b/Source/IO/IOSleep.NT.cpp index ed99901a..0e1c9ef8 100644 --- a/Source/IO/IOSleep.NT.cpp +++ b/Source/IO/IOSleep.NT.cpp @@ -58,4 +58,9 @@ namespace Aurora::IO return bHit; } + + AUKN_SYM bool IOYield() + { + return SleepEx(0, true) == WAIT_IO_COMPLETION; + } } \ No newline at end of file diff --git a/Source/IO/IOWaitableIOLoopSource.cpp b/Source/IO/IOWaitableIOLoopSource.cpp new file mode 100644 index 00000000..95a33abe --- /dev/null +++ b/Source/IO/IOWaitableIOLoopSource.cpp @@ -0,0 +1,73 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOWaitableIOLoopSource.cpp + Date: 2022-6-6 + Author: Reece +***/ +#include +#include +#include "IOWaitableIOLoopSource.hpp" + +namespace Aurora::IO +{ + IOWatachableIOLoopSource::IOWatachableIOLoopSource(const AuSPtr &source) : source(source) + { + + } + + bool IOWatachableIOLoopSource::IsRunOnOtherTick() + { + return {}; + } + + bool IOWatachableIOLoopSource::IsRunOnTick() + { + return {}; + } + + bool IOWatachableIOLoopSource::CanRequestTick() + { + return {}; + } + + void IOWatachableIOLoopSource::OnReportPumper(const AuSPtr &iface) + { + + } + + bool IOWatachableIOLoopSource::IsRunOnSelfIO() + { + return true; + } + + AuSPtr IOWatachableIOLoopSource::GetSelfIOSource() + { + return GetSelfIOSource(); + } + + bool IOWatachableIOLoopSource::ApplyRateLimit() + { + return {}; + } + + AuSPtr IOWatachableIOLoopSource::GetLoopSource() + { + return this->source; + } + + AuSPtr IOWatachableIOLoopSource::SetLoopSource(const AuSPtr &ls) + { + return AuExchange(this->source, ls); + } + + bool IOWatachableIOLoopSource::IsRunOnSelfIOCheckedOnTimerTick() + { + return true; + } + + AUKN_SYM AuSPtr NewWaitableLoopSource(const AuSPtr &ptr) + { + return AuMakeShared(ptr); + } +} \ No newline at end of file diff --git a/Source/IO/IOWaitableIOLoopSource.hpp b/Source/IO/IOWaitableIOLoopSource.hpp new file mode 100644 index 00000000..9cc8c1b2 --- /dev/null +++ b/Source/IO/IOWaitableIOLoopSource.hpp @@ -0,0 +1,33 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOWaitableIOLoopSource.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct IOWatachableIOLoopSource : IIOWatachableIOLoopSource + { + IOWatachableIOLoopSource(const AuSPtr &source); + + bool IsRunOnOtherTick() override; + bool IsRunOnTick() override; + + bool CanRequestTick() override; + void OnReportPumper(const AuSPtr &iface) override; + + bool IsRunOnSelfIO() override; + AuSPtr GetSelfIOSource() override; + + bool ApplyRateLimit() override; + + AuSPtr GetLoopSource() override; + AuSPtr SetLoopSource(const AuSPtr &ls) override; + bool IsRunOnSelfIOCheckedOnTimerTick() override; + + AuSPtr source; + }; +} \ No newline at end of file diff --git a/Source/IO/IOWaitableIOTimer.cpp b/Source/IO/IOWaitableIOTimer.cpp new file mode 100644 index 00000000..661637f0 --- /dev/null +++ b/Source/IO/IOWaitableIOTimer.cpp @@ -0,0 +1,97 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOWaitableIOTimer.cpp + Date: 2022-6-6 + Author: Reece +***/ +#include +#include +#include "IOWaitableIOTimer.hpp" + +namespace Aurora::IO +{ + IOWaitableIOTimer::IOWaitableIOTimer() + { + + } + + bool IOWaitableIOTimer::IsValid() + { + return bool(this->source); + } + + bool IOWaitableIOTimer::IsRunOnOtherTick() + { + return {}; + } + + bool IOWaitableIOTimer::IsRunOnTick() + { + return {}; + } + + bool IOWaitableIOTimer::CanRequestTick() + { + return {}; + } + + void IOWaitableIOTimer::OnReportPumper(const AuSPtr &iface) + { + + } + + bool IOWaitableIOTimer::IsRunOnSelfIO() + { + return true; + } + + AuSPtr IOWaitableIOTimer::GetSelfIOSource() + { + return this->source; + } + + bool IOWaitableIOTimer::IsRunOnSelfIOCheckedOnTimerTick() + { + return true; + } + + bool IOWaitableIOTimer::ApplyRateLimit() + { + return false; + } + + AuUInt64 IOWaitableIOTimer::SetConstantTick(AuUInt64 ns) + { + auto old = AuExchange(this->constantTickNs, ns); + this->source->UpdateTickRateIfAnyNs(ns); + return old; + } + + AuUInt64 IOWaitableIOTimer::SetTargetTimeAbs(AuUInt64 ns) + { + auto old = AuExchange(this->targetTimeAbsNs, ns); + this->source->UpdateTimeNs(ns); + return old; + } + + AUKN_SYM AuSPtr NewWaitableIOTimer() + { + auto timer = AuMakeShared(); + if (!timer) + { + SysPushErrorMem(); + return {}; + } + + timer->source = AuLoop::NewLSTimer(0); + + if (!timer->IsValid()) + { + SysPushErrorMem(); + return {}; + } + + return timer; + } +} \ No newline at end of file diff --git a/Source/IO/IOWaitableIOTimer.hpp b/Source/IO/IOWaitableIOTimer.hpp new file mode 100644 index 00000000..3c685619 --- /dev/null +++ b/Source/IO/IOWaitableIOTimer.hpp @@ -0,0 +1,37 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOWaitableIOTimer.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct IOWaitableIOTimer : IIOWaitableIOTimer + { + IOWaitableIOTimer(); + + bool IsValid(); + + bool IsRunOnOtherTick() override; + bool IsRunOnTick() override; + + bool CanRequestTick() override; + void OnReportPumper(const AuSPtr &iface) override; + + bool IsRunOnSelfIO() override; + AuSPtr GetSelfIOSource() override; + bool IsRunOnSelfIOCheckedOnTimerTick() override; + + bool ApplyRateLimit() override; + + AuUInt64 SetConstantTick(AuUInt64 ns) override; + AuUInt64 SetTargetTimeAbs(AuUInt64 ns) override; + + AuSPtr source; + AuUInt64 constantTickNs {}; + AuUInt64 targetTimeAbsNs {}; + }; +} \ No newline at end of file diff --git a/Source/IO/IOWaitableTickLimiter.cpp b/Source/IO/IOWaitableTickLimiter.cpp new file mode 100644 index 00000000..0818bc5a --- /dev/null +++ b/Source/IO/IOWaitableTickLimiter.cpp @@ -0,0 +1,15 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOWaitableTickLimiter.cpp + Date: 2022-6-6 + Author: Reece +***/ +#include +#include +#include "IOWaitableTickLimiter.hpp" + +namespace Aurora::IO +{ + +} \ No newline at end of file diff --git a/Source/IO/IOWaitableTickLimiter.hpp b/Source/IO/IOWaitableTickLimiter.hpp new file mode 100644 index 00000000..0206c76a --- /dev/null +++ b/Source/IO/IOWaitableTickLimiter.hpp @@ -0,0 +1,13 @@ +/*** + Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOWaitableTickLimiter.hpp + Date: 2022-6-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + +} \ No newline at end of file diff --git a/Source/Loop/LSTimer.NT.cpp b/Source/Loop/LSTimer.NT.cpp index c42894a6..f9b74cb0 100644 --- a/Source/Loop/LSTimer.NT.cpp +++ b/Source/Loop/LSTimer.NT.cpp @@ -52,6 +52,7 @@ namespace Aurora::Loop this->reschedStepNsOrZero_ = AuMSToNS(reschedStepMsOrZero); this->maxIterationsOrZero_ = maxIterationsOrZero; this->count_ = 0; + UpdateTimeInternal(this->targetTime_); } void LSTimer::UpdateTickRateIfAnyNs(AuUInt64 reschedStepNsOrZero, AuUInt32 maxIterationsOrZero) @@ -59,6 +60,7 @@ namespace Aurora::Loop this->reschedStepNsOrZero_ = reschedStepNsOrZero; this->maxIterationsOrZero_ = maxIterationsOrZero; this->count_ = 0; + UpdateTimeInternal(this->targetTime_); } bool LSTimer::OnTrigger(AuUInt handle) diff --git a/Source/RuntimeInternal.hpp b/Source/RuntimeInternal.hpp index fd9a2765..b4aa4784 100644 --- a/Source/RuntimeInternal.hpp +++ b/Source/RuntimeInternal.hpp @@ -57,6 +57,8 @@ #endif +#define GIMME_IOWAITABLEITEM + #include inline Aurora::RuntimeStartInfo gRuntimeConfig;