2/3 of the IO update (very early implementation)

[+] TTYConsole::GetPaddingTopOfLog,GetPaddingHeadOfLog,GetPaddingTopOfLog [+ set variants]
[+] IO::IOYield()
[+] IO::IAsyncTransaction::Failed,GetOSErrorCode()
[+] IByteBufferStreamPair
[+] IIOBufferedInterceptor
[+] IIOBufferedProcessor
[+] IIOEventListener
[+] IIOPipeEventListener
[+] IIOProcessorEventListener
[+] IIOProcessorManualInvoker
[+] IIOWaitableIOLoopSource
[+] IIOWaitableIOTimer
[+] IIOWaitableItem
[+] IIOWaitableTickLimiter
[+] IOAdapterAsyncStream
[+] IOAdapterByteBuffer
[+] IOAdapterCompression
[+] IOAdapterSeeking
[*] Cleanup CpuInfo.Linux.cpp
[*] Fixup async threadpool some more
[*] LSTimer.NT.cpp updates timer object on tick state update, akin to Linux
This commit is contained in:
Reece Wilson 2022-06-12 00:01:27 +01:00
parent 0a4c0bacfc
commit 44108a322e
64 changed files with 3185 additions and 44 deletions

View File

@ -40,8 +40,8 @@ namespace Aurora::Async
/// A really terrible name for the overloadable method that serves as the critical failure callback
/// This may run from any thread
virtual void OnFailure() {};
inline virtual void OnFailure() {};
virtual void *GetPrivateData() { return nullptr; }
inline virtual void *GetPrivateData() { return nullptr; }
};
}

View File

@ -65,11 +65,11 @@ namespace Aurora::Console::ConsoleTTY
virtual bool GetLeftBorder() = 0;
virtual bool GetRightBorder() = 0;
virtual bool GetBottomBorder() = 0;
virtual bool GetBannerFootBorder() = 0;
virtual AuUInt8 GetPaddingHeadOfLog(AuUInt8 newValue) = 0;
virtual AuUInt8 GetPaddingTopOfLog(AuUInt8 newValue) = 0;
virtual AuUInt8 SetPaddingLeftOfLog(AuUInt8 newValue) = 0;
virtual AuUInt8 SetPaddingRightOfLog(AuUInt8 newValue) = 0;
virtual AuUInt8 SetPaddingLeftOfInput(AuUInt8 newValue) = 0;
@ -79,7 +79,8 @@ namespace Aurora::Console::ConsoleTTY
virtual AuUInt8 SetPaddingMidOfHeader(AuUInt8 newValue) = 0;
virtual AuUInt8 SetPaddingBottomOfSubheader(AuUInt8 newValue) = 0;
virtual AuUInt8 GetPaddingHeadOfLog() = 0;
virtual AuUInt8 GetPaddingTopOfLog() = 0;
virtual AuUInt8 GetPaddingLeftOfLog() = 0;
virtual AuUInt8 GetPaddingRightOfLog() = 0;
virtual AuUInt8 GetPaddingLeftOfInput() = 0;

View File

@ -0,0 +1,19 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: EIOProcessorEventStage.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
AUE_DEFINE(EIOProcessorEventStage, (
eFrameStartOfFrame,
eFrameYieldIO,
eFrameIODone,
eFrameWorkDone,
eFrameEndOfFrame
));
}

View File

@ -32,6 +32,18 @@ namespace Aurora::IO
*/
virtual bool Complete() = 0;
/**
* @brief
* @return
*/
virtual bool Failed() = 0;
/**
* @brief
* @return
*/
virtual AuUInt GetOSErrorCode() = 0;
/**
* @brief Returns the last packets length assuming ::Complete() is true or you are within the registered callback
*/

View File

@ -0,0 +1,24 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IByteBufferStreamPair.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IByteBufferStreamPair
{
virtual AuSPtr<IStreamReader> ToStreamReader() = 0;
virtual AuSPtr<ISeekingReader> ToSeekingReader() = 0;
virtual AuSPtr<IStreamWriter> ToStreamWriter() = 0;
virtual AuSPtr<Memory::ByteBuffer> ToByteBuffer() = 0;
};
AUKN_SYM AuSPtr<IByteBufferStreamPair> NewByteBufferPair();
AUKN_SYM AuSPtr<IByteBufferStreamPair> NewByteBufferPairEx(AuUInt length, bool permitResize);
AUKN_SYM AuSPtr<IByteBufferStreamPair> NewRingByteBuffer(AuUInt length);
}

View File

@ -0,0 +1,23 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IIOBufferedProcessor.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IIOBufferedProcessor
{
virtual AuUInt32 TryProcessBuffered() = 0;
virtual AuUInt32 GetRawBytesBuffered() = 0;
virtual AuUInt32 GetRawBytesLimit() = 0;
};
AUKN_SYM AuSPtr<IIOBufferedProcessor> NewBufferedProcessor(const AuSPtr<IStreamReader> &source,
const AuSPtr<IIOPipeInterceptor> &processor,
const AuSPtr<IStreamWriter> &drain,
AuUInt32 bufferSize);
}

View File

@ -0,0 +1,26 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IIOEventListener.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
AUKN_INTERFACE(IIOEventListener,
AUI_METHOD(void, Tick_RunOnTick, ()), // called on each tick, if registered by the iWaitableitem interface
AUI_METHOD(void, Tick_OtherIOEvent, ()), // called if a different object woke us up
AUI_METHOD(void, Tick_SelfIOEvent, ()), // called if a ticker, io loop source, or other self-iWaitableitem interface poked us
AUI_METHOD(void, Tick_Any, ()), // always called if any of the above are called
AUI_METHOD(void, Tick_FrameEpilogue, ()), // called at the end of the tick, once every IOEventListener has had time to fire
AUI_METHOD(void, OnFailureCompletion, ()),
AUI_METHOD(void, OnNominalCompletion, ())
);
}

View File

@ -0,0 +1,19 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IIOPipeEventListener.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IOPipeData;
AUKN_INTERFACE(IIOPipeEventListener,
AUI_METHOD(void, OnPipePartialEvent, (const IOPipeData &, current, AuUInt, tranferred)),
AUI_METHOD(void, OnPipeSuccessEvent, (const IOPipeData &, current)),
AUI_METHOD(void, OnPipeFailureEvent, (const IOPipeData &, current))
);
}

View File

@ -0,0 +1,16 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IIOPipeInterceptor.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
AUKN_INTERFACE(IIOPipeInterceptor,
AUI_METHOD(bool, OnDataAvailable, (const Memory::MemoryViewStreamRead &, view,
const AuSPtr<IStreamWriter> &, out))
);
}

View File

@ -0,0 +1,72 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IIOPipeProcessor.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IIOPipeInterceptor;
struct IOPipeData
{
AuSPtr<IIOWaitableItem> watchItem;
AuSPtr<IStreamReader> reader;
AuSPtr<IStreamWriter> writer;
};
struct IOPipeRequest
{
/**
* @brief The two streams to join and an invokable object
*/
IOPipeData data;
/**
* @brief Amount of bytes to transfer
*/
AuUInt32 lengthOrZero {};
/**
* @brief event listener
*/
AuSPtr<IIOPipeEventListener> listener;
/**
* @brief Used as the buffer size for streams of length 0
*/
AuUInt32 fallbackPageSize {4096 * 50};
AuSPtr<IIOPipeInterceptor> processor;
};
/**
* @brief Different operating systems implement high level stream copy abstraction between network, file, and/or file descriptors.
* This interface connects arbitrary stream objects to one another by piping data under an iprocessor tick; or delegates
* such task to the operating system, if possible.
*/
struct IIOPipeProcessor
{
/**
* @brief
* @param request
* @return
*/
virtual bool BeginPipe(const IOPipeRequest &request) = 0;
/**
* @brief
* @param itm
*/
virtual void EndByWatch(const AuSPtr<IIOWaitableItem> &itm) = 0;
/**
* @brief
* @param itm
*/
virtual void EndByListener(const AuSPtr<IIOPipeEventListener> &itm) = 0;
};
}

View File

@ -0,0 +1,46 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IIOProcessor.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IIOProcessor
{
virtual AuUInt32 TryTick() = 0;
virtual AuUInt32 RunTick() = 0;
virtual AuUInt32 TickFor(const AuSPtr<IIOProcessorItem> &ioEvent) = 0;
virtual AuUInt32 ManualTick() = 0;
virtual AuUInt64 SetRefreshRate(AuUInt64 ns) = 0;
virtual bool HasRefreshRate() = 0;
virtual bool MultiplexRefreshRateWithIOEvents() = 0;
virtual AuUInt64 GetOwnedThreadId() = 0;
virtual AuSPtr<IIOProcessorItem> StartIOWatch(const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOEventListener> &listener) = 0;
// Inter-frame callbacks indicating point of execution throughout the [START](optional [yield])[IO][TICK][EPILOGUE][END OF FRAME] frame
virtual bool AddEventListener (const AuSPtr<IIOProcessorEventListener> &eventListener) = 0;
virtual void RemoveEventListener(const AuSPtr<IIOProcessorEventListener> &eventListener) = 0;
virtual AuSPtr<IIOPipeProcessor> ToPipeProcessor() = 0;
// Reference loop queue
virtual AuSPtr<Loop::ILoopQueue> ToQueue() = 0;
virtual void ReleaseAllWatches() = 0;
virtual bool HasItems() = 0;
};
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessor(bool tickOnly, const AuSPtr<Loop::ILoopQueue> &queue);
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessorOnThread(bool tickOnly, Async::WorkerPId_t id);
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessorNoQueue(bool tickOnly);
}

View File

@ -0,0 +1,15 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IIOProcessorEventListener.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
AUKN_INTERFACE(IIOProcessorEventListener,
AUI_METHOD(void, OnIOProcessorStateEvent, (EIOProcessorEventStage, stage))
);
}

View File

@ -0,0 +1,17 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IIOProcessorItem.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IIOProcessorItem : IIOProcessorManualInvoker
{
virtual bool StopWatch() = 0;
virtual bool FailWatch() = 0;
};
}

View File

@ -0,0 +1,16 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IIOProcessorManualInvoker.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IIOProcessorManualInvoker
{
virtual void InvokeManualTick() = 0;
};
}

View File

@ -0,0 +1,19 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IIOWatachableIOLoopSource.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IIOWatachableIOLoopSource : IIOWaitableItem
{
virtual AuSPtr<Loop::ILoopSource> GetLoopSource() = 0;
virtual AuSPtr<Loop::ILoopSource> SetLoopSource(const AuSPtr<Loop::ILoopSource> &ls) = 0;
};
AUKN_SYM AuSPtr<IIOWatachableIOLoopSource> NewWaitableLoopSource(const AuSPtr<Loop::ILoopSource> &ptr);
}

View File

@ -0,0 +1,19 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IIOWaitableIOTimer.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IIOWaitableIOTimer : IIOWaitableItem
{
virtual AuUInt64 SetConstantTick(AuUInt64 ns) = 0;
virtual AuUInt64 SetTargetTimeAbs(AuUInt64 ns) = 0;
};
AUKN_SYM AuSPtr<IIOWaitableIOTimer> NewWaitableIOTimer();
}

View File

@ -0,0 +1,34 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IIOWaitableItem.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
// Unstable and hacky interface
// You shouldn't try to implement this yourself
// Defer to IIOWaitableIOLoopSource, IIOWaitableIOTimer, and IIOWaitableTickLimiter for intended impl usage
AUKN_INTERFACE(IIOWaitableItem,
AUI_METHOD(bool, IsRunOnTick, ()),
AUI_METHOD(bool, IsRunOnOtherTick, ()),
AUI_METHOD(bool, CanRequestTick, ()),
AUI_METHOD(void, OnReportPumper, (const AuSPtr<IIOProcessorManualInvoker> &, iface)),
AUI_METHOD(bool, IsRunOnSelfIO, ()),
AUI_METHOD(AuSPtr<Loop::ILoopSource>, GetSelfIOSource, ()),
AUI_METHOD(bool, IsRunOnSelfIOCheckedOnTimerTick, ()),
AUI_METHOD(bool, ApplyRateLimit, ())
);
}

View File

@ -0,0 +1,20 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IIOWaitableTickLimiter.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IIOWaitableTickLimiter : IIOWaitableItem
{
virtual AuUInt64 SetConstantTick(AuUInt64 ns) = 0;
virtual AuUInt64 SetMinTime(AuUInt64 ns) = 0;
virtual AuUInt32 SetMinTickDelta(AuUInt32 tickDelta) = 0;
};
AUKN_SYM AuSPtr<IIOWaitableTickLimiter> NewWaitableTickLimiter();
}

View File

@ -0,0 +1,33 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOAdapterAsyncStream.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IAsyncStreamAdapater
{
virtual bool SetFlushOnWrite(bool value) = 0;
virtual void ReserveBuffer(AuUInt length) = 0;
virtual AuUInt GetReadOffset() = 0;
virtual AuUInt SetReadOffset(AuUInt offset) = 0;
virtual AuUInt GetWriteOffset() = 0;
virtual AuUInt SetWriteOffset(AuUInt offset) = 0;
virtual AuSPtr<IStreamReader> ToStreamReader() = 0;
virtual AuSPtr<IStreamWriter> ToStreamWriter() = 0;
virtual AuSPtr<IIOWaitableItem> ToWaitable() = 0;
virtual bool Reset() = 0;
};
AUKN_SYM AuSPtr<IAsyncStreamAdapater> NewAsyncStreamAdapter(const AuSPtr<IAsyncTransaction> &transaction, bool isStream);
}

View File

@ -0,0 +1,15 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOAdapterByteBuffer.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
AUKN_SYM AuSPtr<IStreamReader> NewByteBufferReadAdapter(const AuSPtr<Memory::ByteBuffer> &buffer);
AUKN_SYM AuSPtr<ISeekingReader> NewByteBufferLinearSeekableAdapter(const AuSPtr<Memory::ByteBuffer> &buffer);
AUKN_SYM AuSPtr<IStreamWriter> NewByteBufferWriteAdapter(const AuSPtr<Memory::ByteBuffer> &buffer);
}

View File

@ -0,0 +1,14 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOAdapterCompression.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
AUKN_SYM AuSPtr<IStreamReader> NewCompressionReadAdapter(const AuSPtr<Compression::ICompressionStream> &compresionStream);
AUKN_SYM AuSPtr<ISeekingReader> NewCompressionSeekingAdapter(const AuSPtr<Compression::ICompressionStream> &compresionStream);
}

View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOAdapterCompression.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
AUKN_SYM AuSPtr<IStreamReader> NewSeekingReadAdapter(const AuSPtr<ISeekingReader> &reader);
}

View File

@ -0,0 +1,41 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOExperimental.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
#include "IIOProcessorManualInvoker.hpp"
#include "IIOWaitableItem.hpp"
#include "IIOEventListener.hpp"
#include "EIOProcessorEventStage.hpp"
#include "IIOPipeEventListener.hpp"
#include "IIOPipeProcessor.hpp"
#include "IIOProcessorEventListener.hpp"
#include "IIOProcessorItem.hpp"
#include "IIOProcessor.hpp"
//#include "IBufferedWriterToReader.hpp"
//#include "IBufferedReadProcessorToReader.hpp"
#include "IIOPipeInterceptor.hpp"
#include "IIOBufferedProcessor.hpp"
#include "IByteBufferStreamPair.hpp"
#include "IOAdapterAsyncStream.hpp"
#include "IOAdapterByteBuffer.hpp"
#include "IOAdapterCompression.hpp"
#include "IOAdapterSeeking.hpp"
#include "IIOWaitableTickLimiter.hpp"
#include "IIOWaitableIOTimer.hpp"
#include "IIOWaitableIOLoopSource.hpp"
#include "IOPipeInterceptorNop.hpp"

View File

@ -0,0 +1,18 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IIOPipeInterceptor.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
AUE_DEFINE(EPipeNopStrategy, (
eBuffered,
eWriteAll
));
AuSPtr<IIOPipeInterceptor> NewPipeInterceptorNopCopier(EPipeNopStrategy mode);
}

View File

@ -15,4 +15,6 @@ namespace Aurora::IO
* @return true on preemption, false on timeout
*/
AUKN_SYM bool WaitFor(AuUInt32 milliseconds, bool waitEntireFrame = true);
AUKN_SYM bool IOYield();
}

View File

@ -308,7 +308,7 @@ hacks we have available (outside of ripping the compiler apart to emit special d
experiment worth trying now that modern hardware can make up for software and microcode flaws; and architecture translation. Most users
probably wont even notice the performance loss, until it saves them from a hard crash and they realize dereferences are bloated.
This is default behaviour, and can be easily disabled or configured from within your ecosystem's AuroraConfiguration.h to globally
modify behaviour and subsequent ABI of the AuSPtr's.
modify the behaviour and subsequent ABI of the AuSPtr's.
```

View File

@ -19,6 +19,7 @@ namespace Aurora::Async
static thread_local AuWPtr<ThreadPool> gCurrentPool;
static const auto kMagicResortThreshold = 15;
static thread_local int tlsCallStack;
inline auto GetWorkerInternal(const AuSPtr<IThreadPool> &pool)
{
if (pool.get() == AuStaticCast<IAsyncApp>(gAsyncApp))
@ -134,6 +135,7 @@ namespace Aurora::Async
if (!AuTryInsert(state->workQueue, AuMakePair(target.second, runnable)))
{
DecrementTasksRunning();
runnable->CancelAsync();
return;
}
@ -248,7 +250,6 @@ namespace Aurora::Async
}
bool success {};
auto runMode = GetCurrentThreadRunMode();
do
@ -461,6 +462,22 @@ namespace Aurora::Async
int start = state->cookie;
// Account for
// while (AuAsync.GetCurrentPool()->runForever());
// in the first task (or deeper)
if (InRunnerMode() && tlsCallStack) // are we one call deep?
{
auto queue = ToKernelWorkQueue();
if ((this->tasksRunning_ == tlsCallStack) &&
(!queue || queue->GetSourceCount() <= 1))
{
return false;
}
}
//
for (auto itr = state->pendingWorkItems.begin(); itr != state->pendingWorkItems.end(); )
{
if (state->threadObject->Exiting() || state->shuttingdown)
@ -510,12 +527,16 @@ namespace Aurora::Async
// Remove from our local job queue
itr = state->pendingWorkItems.erase(itr);
tlsCallStack++;
// Dispatch
oops->RunAsync();
// Atomically decrement global task counter
runningTasks = this->tasksRunning_.fetch_sub(1) - 1;
tlsCallStack--;
if (start != state->cookie)
{
start = state->cookie;
@ -523,7 +544,6 @@ namespace Aurora::Async
}
}
gCurrentPool = oldTlsHandle;
// Return popped work back to the groups work pool when our -pump loops were preempted
@ -538,9 +558,17 @@ namespace Aurora::Async
CtxPollReturn(state, magic, true);
// Account for
// while (AuAsync.GetCurrentPool()->runForever());
// in the top most task
if (InRunnerMode())
{
if (runningTasks == 0)
auto queue = ToKernelWorkQueue();
if ((runningTasks == 0) &&
(this->tasksRunning_ == 0 ) &&
(!queue || queue->GetSourceCount() <= 1))
{
Shutdown();
}
@ -760,7 +788,9 @@ namespace Aurora::Async
feature->Init();
}));
auto workItem = this->NewWorkItem(id, work, !async)->Dispatch();
SysAssert(workItem);
if (!async)
{

View File

@ -1873,6 +1873,25 @@ namespace Aurora::Console::ConsoleTTY
return this->bottomSubHeaderPadding;
}
AuUInt8 TTYConsole::GetPaddingHeadOfLog(AuUInt8 newValue)
{
return AuExchange(this->topLogPadding, newValue);
}
AuUInt8 TTYConsole::GetPaddingTopOfLog(AuUInt8 newValue)
{
return AuExchange(this->topLogPaddingExtra, newValue);
}
AuUInt8 TTYConsole::GetPaddingHeadOfLog()
{
return this->topLogPadding;
}
AuUInt8 TTYConsole::GetPaddingTopOfLog()
{
return this->topLogPaddingExtra;
}
void Init()
{

View File

@ -128,26 +128,30 @@ namespace Aurora::Console::ConsoleTTY
virtual bool SetLeftBorder(bool newValue) override;
virtual bool SetRightBorder(bool newValue) override;
AuUInt8 SetPaddingLeftOfLog(AuUInt8 newValue) override;
AuUInt8 SetPaddingRightOfLog(AuUInt8 newValue) override;
AuUInt8 SetPaddingLeftOfInput(AuUInt8 newValue) override;
AuUInt8 SetPaddingTopOfInput(AuUInt8 newValue) override;
AuUInt8 SetPaddingTopOfHint(AuUInt8 newValue) override;
AuUInt8 SetPaddingTopOfHeader(AuUInt8 newValue) override;
AuUInt8 SetPaddingMidOfHeader(AuUInt8 newValue) override;
AuUInt8 SetPaddingBottomOfSubheader(AuUInt8 newValue) override;
AuUInt8 GetPaddingLeftOfLog() override;
AuUInt8 GetPaddingRightOfLog() override;
AuUInt8 GetPaddingLeftOfInput() override;
AuUInt8 GetPaddingTopOfInput() override;
AuUInt8 GetPaddingTopOfHint() override;
AuUInt8 GetPaddingTopOfHeader() override;
AuUInt8 GetPaddingMidOfHeader() override;
AuUInt8 GetPaddingBottomOfSubheader() override;
virtual AuUInt8 SetPaddingLeftOfLog(AuUInt8 newValue) override;
virtual AuUInt8 SetPaddingRightOfLog(AuUInt8 newValue) override;
virtual AuUInt8 SetPaddingLeftOfInput(AuUInt8 newValue) override;
virtual AuUInt8 SetPaddingTopOfInput(AuUInt8 newValue) override;
virtual AuUInt8 SetPaddingTopOfHint(AuUInt8 newValue) override;
virtual AuUInt8 SetPaddingTopOfHeader(AuUInt8 newValue) override;
virtual AuUInt8 SetPaddingMidOfHeader(AuUInt8 newValue) override;
virtual AuUInt8 SetPaddingBottomOfSubheader(AuUInt8 newValue) override;
virtual AuUInt8 GetPaddingLeftOfLog() override;
virtual AuUInt8 GetPaddingRightOfLog() override;
virtual AuUInt8 GetPaddingLeftOfInput() override;
virtual AuUInt8 GetPaddingTopOfInput() override;
virtual AuUInt8 GetPaddingTopOfHint() override;
virtual AuUInt8 GetPaddingTopOfHeader() override;
virtual AuUInt8 GetPaddingMidOfHeader() override;
virtual AuUInt8 GetPaddingBottomOfSubheader() override;
virtual AuUInt8 GetPaddingHeadOfLog(AuUInt8 newValue) override;
virtual AuUInt8 GetPaddingTopOfLog(AuUInt8 newValue) override;
virtual AuUInt8 GetPaddingHeadOfLog() override;
virtual AuUInt8 GetPaddingTopOfLog() override;
void WriteBuffered(AuPair<AuUInt32, AuUInt32> pos, const AuString &in);
void WriteLine(int Y, const AuString &in);
void BlankLine(int Y, bool borders = true);
@ -168,7 +172,6 @@ namespace Aurora::Console::ConsoleTTY
int oldWidth {};
int oldHeight {};
int iHistoryPos {-1};
int iHistoryWritePos {0};
AuList<AuString> history;
@ -235,8 +238,8 @@ namespace Aurora::Console::ConsoleTTY
int leftLogPadding {1};
int rightLogPadding {1};
int topLogPadding {0}; // extra padding, if displaying header
int topLogPaddingExtra {1}; // this is backwards
int topLogPadding {0}; // extra padding, if displaying header ...
int topLogPaddingExtra {1}; // this is backwards ...
int leftInputPadding {0};
int topInputPadding {0};

View File

@ -21,13 +21,17 @@ namespace Aurora::HWInfo
AuString contents;
if (AuIOFS::ReadString(path, contents))
{
char *endPtr;
auto word = strtoll(contents.c_str(), &endPtr, 10);
if (errno == ERANGE) return 0;
return word;
return 0;
}
return 0;
char *endPtr;
auto word = strtoll(contents.c_str(), &endPtr, 10);
if (errno == ERANGE)
{
return 0;
}
return word;
}
static AuString ReadString(const AuString &path)
@ -79,7 +83,7 @@ namespace Aurora::HWInfo
auto cpuId = CpuBitId(threadId);
auto coreID = ReadUInt(kBasePath + coreStr + "/topology/core_id");
auto cpuList = ReadString(kBasePath + coreStr +"/topology/core_cpus_list");
auto cpuList = ReadString(kBasePath + coreStr + "/topology/core_cpus_list");
auto isHVCore = AuStringContains(cpuList, ",");

View File

@ -0,0 +1,84 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ByteBufferStreamPair.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "ByteBufferStreamPair.hpp"
namespace Aurora::IO
{
ByteBufferStreamPair::ByteBufferStreamPair(const AuSPtr<Memory::ByteBuffer> &buffer) :
buffer(buffer),
reader(buffer),
writer(buffer),
seeker(buffer)
{
}
AuSPtr<IStreamReader> ByteBufferStreamPair::ToStreamReader()
{
return AuSPtr<IStreamReader>(AuSharedFromThis(), AuStaticCast<IStreamReader>(&this->reader));
}
AuSPtr<ISeekingReader> ByteBufferStreamPair::ToSeekingReader()
{
if (this->buffer->flagCircular)
{
SysPushErrorIO("ring buffer must not be seekable");
return {};
}
return AuSPtr<ISeekingReader>(AuSharedFromThis(), AuStaticCast<ISeekingReader>(&this->seeker));
}
AuSPtr<IStreamWriter> ByteBufferStreamPair::ToStreamWriter()
{
return AuSPtr<IStreamWriter>(AuSharedFromThis(), AuStaticCast<IStreamWriter>(&this->writer));
}
AuSPtr<Memory::ByteBuffer> ByteBufferStreamPair::ToByteBuffer()
{
return this->buffer;
}
AUKN_SYM AuSPtr<IByteBufferStreamPair> NewByteBufferPair()
{
auto scalable = AuMakeShared<AuByteBuffer>();
if (!scalable)
{
SysPushErrorMem();
return {};
}
return AuMakeShared<ByteBufferStreamPair>(scalable);
}
AUKN_SYM AuSPtr<IByteBufferStreamPair> NewByteBufferPairEx(AuUInt length, bool permitResize)
{
auto buffer = AuMakeShared<AuByteBuffer>(length, false, permitResize);
if (!buffer)
{
SysPushErrorMem();
return {};
}
return AuMakeShared<ByteBufferStreamPair>(buffer);
}
AUKN_SYM AuSPtr<IByteBufferStreamPair> NewRingByteBuffer(AuUInt length)
{
auto buffer = AuMakeShared<AuByteBuffer>(length, false, false);
if (!buffer)
{
SysPushErrorMem();
return {};
}
return AuMakeShared<ByteBufferStreamPair>(buffer);
}
}

View File

@ -0,0 +1,27 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ByteBufferStreamPair.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct ByteBufferStreamPair : IByteBufferStreamPair, AuEnableSharedFromThis<ByteBufferStreamPair>
{
ByteBufferStreamPair(const AuSPtr<Memory::ByteBuffer> &buffer);
AuSPtr<IStreamReader> ToStreamReader() override;
AuSPtr<ISeekingReader> ToSeekingReader() override;
AuSPtr<IStreamWriter> ToStreamWriter() override;
AuSPtr<Memory::ByteBuffer> ToByteBuffer() override;
AuSPtr<Memory::ByteBuffer> buffer;
Buffered::BlobReader reader;
Buffered::BlobWriter writer;
Buffered::BlobSeekableReader seeker;
};
}

View File

@ -240,7 +240,9 @@ namespace Aurora::IO::FS
{
that->pin_.reset();
that->Reset();
SysPushErrorFIO("Async FIO error: {} {}", that->GetFileHandle()->path, GetLastError());
that->osErrorCode = GetLastError();
that->hasFailed = true;
SysPushErrorFIO("QoA async FIO error: {} {}", that->GetFileHandle()->path, that->osErrorCode);
return false;
}
}
@ -253,6 +255,13 @@ namespace Aurora::IO::FS
{
auto transaction = reinterpret_cast<NtAsyncFileTransaction *>(reinterpret_cast<AuUInt8*>(lpOverlapped) - offsetof(NtAsyncFileTransaction, overlap_));
auto hold = AuExchange(transaction->pin_, {});
if (dwErrorCode)
{
hold->hasFailed = true;
hold->osErrorCode = dwErrorCode;
}
hold->Complete();
}
@ -289,6 +298,7 @@ namespace Aurora::IO::FS
::ResetEvent(this->event_);
this->memoryHold_ = memoryView;
this->hasFailed = false;
this->lastAbstractStat_ = memoryView->length;
this->lastAbstractOffset_ = offset;
@ -310,6 +320,8 @@ namespace Aurora::IO::FS
this->latch_ = false;
::ResetEvent(this->event_);
this->hasFailed = false;
this->memoryHold_ = memoryView;
this->lastAbstractStat_ = memoryView->length;
@ -340,12 +352,23 @@ namespace Aurora::IO::FS
void NtAsyncFileTransaction::Reset()
{
::ResetEvent(this->event_);
this->hasFailed = false;
this->lastAbstractStat_ = 0; // do not use latch
}
bool NtAsyncFileTransaction::Failed()
{
return this->hasFailed;
}
AuUInt NtAsyncFileTransaction::GetOSErrorCode()
{
return Failed() ? this->osErrorCode : ERROR_SUCCESS;
}
bool NtAsyncFileTransaction::Complete()
{
DWORD read;
DWORD read {};
if (!this->lastAbstractStat_)
{
@ -357,7 +380,8 @@ namespace Aurora::IO::FS
return false;
}
if (GetOverlappedResult(this->handle_->handle, &this->overlap_, &read, false))
if ((this->hasFailed) ||
::GetOverlappedResult(this->handle_->handle, &this->overlap_, &read, false))
{
bool bLatched = this->latch_;
DispatchCb(read);
@ -370,7 +394,12 @@ namespace Aurora::IO::FS
AuUInt32 NtAsyncFileTransaction::GetLastPacketLength()
{
DWORD read {};
GetOverlappedResult(this->handle_->handle, &this->overlap_, &read, false);
if (!::GetOverlappedResult(this->handle_->handle, &this->overlap_, &read, false))
{
return {};
}
return read;
}
@ -386,7 +415,7 @@ namespace Aurora::IO::FS
return true;
}
auto ret = WaitForSingleObjectEx(this->event_, timeout ? timeout : INFINITE, true);
auto ret = ::WaitForSingleObjectEx(this->event_, timeout ? timeout : INFINITE, true);
if (ret == WAIT_OBJECT_0)
{
return Complete();
@ -438,7 +467,7 @@ namespace Aurora::IO::FS
return {};
}
stream->Init(fileHandle);
stream->Init(fileHandle);
return stream;
}

View File

@ -47,7 +47,11 @@ namespace Aurora::IO::FS
bool StartRead(AuUInt64 offset, const AuSPtr<AuMemoryViewWrite> &memoryView) override;
bool StartWrite(AuUInt64 offset, const AuSPtr<AuMemoryViewRead> &memoryView) override;
bool Complete() override;
bool Complete() override;
bool Failed() override;
AuUInt GetOSErrorCode() override;
AuUInt32 GetLastPacketLength() override;
void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &sub) override;
@ -70,6 +74,9 @@ namespace Aurora::IO::FS
AuUInt32 lastAbstractStat_ {}, lastAbstractOffset_ {};
bool latch_ {};
AuUInt32 osErrorCode {};
bool hasFailed {};
private:
AuSPtr<void> memoryHold_;
AuSPtr<FileHandle> handle_;

View File

@ -0,0 +1,545 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOAdapterAsyncStream.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "IOAdapterAsyncStream.hpp"
#include "IOWaitableIOLoopSource.hpp"
namespace Aurora::IO
{
struct AsyncStreamAdapater;
struct AsyncStreamReader : IStreamReader
{
AsyncStreamAdapater *parent;
EStreamError IsOpen() override;
EStreamError Read(const Memory::MemoryViewStreamWrite &parameters) override;
void Close() override;
};
struct AsyncStreamMemory : AuMemoryViewWrite
{
AsyncStreamMemory(AuUInt length);
~AsyncStreamMemory();
bool IsValid();
AuUInt streamIndex {};
};
struct AsyncStreamWriter : IStreamWriter
{
AsyncStreamAdapater *parent;
~AsyncStreamWriter();
EStreamError IsOpen() override;
EStreamError Write(const Memory::MemoryViewStreamRead &parameters) override;
void Close() override;
void Flush() override;
void Preframe();
void Frame();
AuList<AuSPtr<AsyncStreamMemory>> writesPending;
bool HasWorkItems();
};
struct AsyncStreamAdapater : IAsyncStreamAdapater, AuEnableSharedFromThis<AsyncStreamAdapater>
{
AsyncStreamAdapater();
AuSPtr<AsyncStreamMemory> AllocateNextPageCached(AuUInt length);
virtual AuSPtr<IStreamReader> ToStreamReader() override;
virtual AuSPtr<IStreamWriter> ToStreamWriter() override;
virtual AuSPtr<IIOWaitableItem> ToWaitable() override;
virtual bool Reset() override;
bool Init(const AuSPtr<IAsyncTransaction> &transaction, bool isStream);
AuSPtr<AsyncStreamMemory> lastAllocation;
AuSPtr<IAsyncTransaction> transaction;
bool SetFlushOnWrite(bool value) override;
void ReserveBuffer(AuUInt length) override;
AuUInt GetReadOffset();
AuUInt SetReadOffset(AuUInt offset);
AuUInt GetWriteOffset();
AuUInt SetWriteOffset(AuUInt offset);
bool asyncActive {};
AuUInt readOffset {};
AuUInt writeOffset {};
bool isStream {};
bool flushOnWrite {true};
AuOptionalEx<EStreamError> errorCode;
int locked {};
AsyncStreamReader reader;
AsyncStreamWriter writer;
IOWatachableIOLoopSource source;
};
AsyncStreamMemory::AsyncStreamMemory(AuUInt length) : AuMemoryViewWrite(AuMemory::ZAlloc<AuUInt8*>(length), length)
{
}
AsyncStreamMemory::~AsyncStreamMemory()
{
if (this->ptr)
{
AuMemory::Free(this->ptr);
this->ptr = nullptr;
}
}
bool AsyncStreamMemory::IsValid()
{
return bool(this->ptr);
}
bool AsyncStreamAdapater::SetFlushOnWrite(bool value)
{
return AuExchange(this->flushOnWrite, value);
}
AuUInt AsyncStreamAdapater::GetReadOffset()
{
return this->readOffset;
}
AuUInt AsyncStreamAdapater::SetReadOffset(AuUInt offset)
{
if (this->locked == 1)
{
this->writer.Preframe();
}
return AuExchange(this->readOffset, offset);
}
AuUInt AsyncStreamAdapater::GetWriteOffset()
{
if (this->locked == 1)
{
this->writer.Preframe();
}
return this->writeOffset;
}
AuUInt AsyncStreamAdapater::SetWriteOffset(AuUInt offset)
{
return AuExchange(this->writeOffset, offset);
}
bool AsyncStreamAdapater::Init(const AuSPtr<IAsyncTransaction> &transaction, bool isStream)
{
this->transaction = transaction;
this->lastAllocation.reset();
this->asyncActive = false;
this->reader.parent = this;
this->writer.parent = this;
return true;
}
AsyncStreamAdapater::AsyncStreamAdapater() : source({})
{
}
AuSPtr<AsyncStreamMemory> AsyncStreamAdapater::AllocateNextPageCached(AuUInt length)
{
if (this->lastAllocation)
{
if (this->lastAllocation->length >= length)
{
return this->lastAllocation;
}
}
auto newMem = AuMakeShared<AsyncStreamMemory>(length);
if (!newMem)
{
SysPushErrorMem();
return {};
}
if (!newMem->IsValid())
{
SysPushErrorMem();
return {};
}
return this->lastAllocation = newMem;
}
EStreamError AsyncStreamReader::IsOpen()
{
return this->parent->errorCode.HasValue() ?
this->parent->errorCode.value() :
EStreamError::eErrorNone;
}
EStreamError AsyncStreamReader::Read(const Memory::MemoryViewStreamWrite &parameters)
{
if (!parameters.length)
{
SysPushErrorArg();
return EStreamError::eErrorEndOfStream;
}
// Read from the last tranaction, if not fully consumed
if (parent->lastAllocation)
{
auto length = parent->transaction->GetLastPacketLength();
if (length &&
parent->lastAllocation->streamIndex != length)
{
auto toRead = AuMin<AuUInt>(parameters.length, length - parent->lastAllocation->streamIndex);
if (toRead)
{
if (parameters.ptr)
{
AuMemcpy(parameters.ptr, parent->lastAllocation->Begin<AuUInt8>() + parent->lastAllocation->streamIndex, toRead);
if (parent->isStream)
{
parent->lastAllocation->streamIndex += toRead;
}
else
{
parent->lastAllocation->streamIndex += length;
}
}
}
if (parent->isStream)
{
parent->lastAllocation->streamIndex += length;
}
parameters.outVariable = toRead;
return EStreamError::eErrorNone;
}
if (parent->transaction && parent->transaction->Failed())
{
SysPushErrorIO("AIO transaction read failed: {}", parent->transaction->GetOSErrorCode());
parent->errorCode = EStreamError::eErrorStreamInterrupted;
parent->lastAllocation.reset();
}
}
// Async error
if (parent->errorCode.HasValue())
{
auto code = parent->isStream ?
parent->errorCode.Value() :
AuExchange(parent->errorCode, {}).Value();
if (code != EStreamError::eErrorNone)
{
return code;
}
}
// Async awaiting response
if (parent->asyncActive && !parent->transaction->Complete())
{
parameters.outVariable = 0;
return EStreamError::eErrorNone;
}
// Async success or blank state
parent->transaction->Reset();
parent->asyncActive = true;
parent->lastAllocation = parent->AllocateNextPageCached(parameters.length);
parent->lastAllocation->streamIndex = 0;
if (!parent->transaction->StartRead(parent->isStream ? 0 : parent->readOffset, parent->lastAllocation))
{
parent->asyncActive = false;
SysPushErrorNested("Couldn't start async aio read");
return EStreamError::eErrorStreamInterrupted;
}
return EStreamError::eErrorNone;
}
void AsyncStreamReader::Close()
{
}
AsyncStreamWriter::~AsyncStreamWriter()
{
Flush();
}
EStreamError AsyncStreamWriter::IsOpen()
{
return this->parent->errorCode.HasValue() ?
this->parent->errorCode.value() :
EStreamError::eErrorNone;
}
EStreamError AsyncStreamWriter::Write(const Memory::MemoryViewStreamRead &parameters)
{
if (!parameters.ptr)
{
return EStreamError::eErrorStreamInterrupted;
}
Preframe();
if (parent->errorCode.HasValue())
{
auto code = parent->isStream ?
parent->errorCode.Value() :
AuExchange(parent->errorCode, {}).Value();
if (code != EStreamError::eErrorNone)
{
return code;
}
}
auto newMem = AuMakeShared<AsyncStreamMemory>(parameters.length);
if (!newMem)
{
SysPushErrorMem();
return EStreamError::eErrorStreamInterrupted;
}
if (!newMem->IsValid())
{
SysPushErrorMem();
return EStreamError::eErrorStreamInterrupted;
}
AuMemcpy(newMem->ptr, parameters.ptr, parameters.length);
parameters.outVariable = parameters.length;
if (!AuTryInsert(this->writesPending, newMem))
{
SysPushErrorMem();
return EStreamError::eErrorStreamInterrupted;
}
if (this->parent->flushOnWrite)
{
Frame();
}
return EStreamError::eErrorNone;
}
void AsyncStreamWriter::Flush()
{
Preframe();
Frame();
}
void AsyncStreamWriter::Close()
{
Flush();
}
void AsyncStreamWriter::Preframe()
{
if (parent->transaction->Complete())
{
if (parent->transaction->Failed())
{
SysPushErrorIO("AIO transaction write failed: {}", parent->transaction->GetOSErrorCode());
parent->errorCode = EStreamError::eErrorStreamInterrupted;
parent->lastAllocation.reset();
}
else
{
parent->transaction->GetLastPacketLength();
}
parent->transaction->Reset();
}
}
void AsyncStreamWriter::Frame()
{
AuSPtr<AsyncStreamMemory> buffer;
if (this->writesPending.size() == 1)
{
buffer = AuMove(this->writesPending[0]);
}
else
{
AuUInt length {};
for (auto &a : this->writesPending)
{
length += a->length;
}
buffer = this->parent->AllocateNextPageCached(length);
if (!buffer)
{
return;
}
AuUInt index {};
for (auto &a : this->writesPending)
{
if (a->length + index > buffer->length)
{
SysPanic("");
}
AuMemcpy(buffer->Begin<AuUInt8>() + index, a->ptr, a->length);
index += a->length;
}
//....
}
// Async success or blank state
parent->transaction->Reset();
parent->asyncActive = true;
struct WriteMem : AuMemoryViewRead
{
AuSPtr<AsyncStreamMemory> write;
};
auto annoying = AuMakeShared<WriteMem>();
if (!annoying)
{
SysPushErrorMem();
return;
}
annoying->write = buffer;
annoying->ptr = buffer->ptr;
annoying->length = buffer->length;
parent->lastAllocation = buffer;
parent->lastAllocation->streamIndex = 0;
if (!parent->transaction->StartWrite(parent->isStream ? 0 : parent->writeOffset, annoying))
{
parent->asyncActive = false;
SysPushErrorNested("Couldn't start async aio write");
return;
}
this->writesPending.clear();
}
bool AsyncStreamWriter::HasWorkItems()
{
return this->writesPending.size();
}
AuSPtr<IStreamReader> AsyncStreamAdapater::ToStreamReader()
{
if (this->locked != 0 && this->locked != 2)
{
return {};
}
this->locked = 2;
return AuSPtr<IStreamReader>(AuSharedFromThis(), &this->reader);
}
AuSPtr<IStreamWriter> AsyncStreamAdapater::ToStreamWriter()
{
if (this->locked != 0 && this->locked != 1)
{
return {};
}
this->locked = 1;
return AuSPtr<IStreamWriter>(AuSharedFromThis(), &this->writer);
}
AuSPtr<IIOWaitableItem> AsyncStreamAdapater::ToWaitable()
{
this->source.SetLoopSource(this->transaction->NewLoopSource());
return AuSPtr<IIOWaitableItem>(AuSharedFromThis(), &this->source);
}
void AsyncStreamAdapater::ReserveBuffer(AuUInt length)
{
if (!this->lastAllocation || !this->asyncActive)
{
this->lastAllocation = this->AllocateNextPageCached(length);
}
}
bool AsyncStreamAdapater::Reset()
{
if (this->locked == 1)
{
if (this->writer.HasWorkItems())
{
return false;
}
this->writer.Flush();
}
if (this->asyncActive)
{
if (!this->transaction->Complete())
{
return false;
}
}
this->locked = 0;
this->transaction->Reset();
this->writeOffset = 0;
this->readOffset = 0;
return true;
}
AUKN_SYM AuSPtr<IAsyncStreamAdapater> NewAsyncStreamAdapter(const AuSPtr<IAsyncTransaction> &transaction, bool isStream)
{
if (!transaction)
{
SysPushErrorArg();
return {};
}
auto adapter = AuMakeShared<AsyncStreamAdapater>();
if (!adapter)
{
return {};
}
if (!adapter->Init(transaction, isStream))
{
return {};
}
return adapter;
}
}

View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOAdapterAsyncStream.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
}

View File

@ -0,0 +1,52 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOAdapterByteBuffer.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "IOAdapterByteBuffer.hpp"
namespace Aurora::IO
{
AUKN_SYM AuSPtr<IStreamReader> NewByteBufferReadAdapter(const AuSPtr<Memory::ByteBuffer> &buffer)
{
if (!buffer)
{
SysPushErrorArg();
return {};
}
return AuMakeShared<Buffered::BlobReader>(buffer);
}
AUKN_SYM AuSPtr<ISeekingReader> NewByteBufferLinearSeekableAdapter(const AuSPtr<Memory::ByteBuffer> &buffer)
{
if (!buffer)
{
SysPushErrorArg();
return {};
}
if (buffer->flagCircular)
{
SysPushErrorIO("Seekable buffer must not be circular");
return {};
}
return AuMakeShared<Buffered::BlobSeekableReader>(buffer);
}
AUKN_SYM AuSPtr<IStreamWriter> NewByteBufferWriteAdapter(const AuSPtr<Memory::ByteBuffer> &buffer)
{
if (!buffer)
{
SysPushErrorArg();
return {};
}
return AuMakeShared<Buffered::BlobWriter>(buffer);
}
}

View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOAdapterByteBuffer.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
}

View File

@ -0,0 +1,103 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOAdapterCompression.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "IOAdapterCompression.hpp"
namespace Aurora::IO
{
CompressionStreamReader::CompressionStreamReader(const AuSPtr<Compression::ICompressionStream> &compressionStream) : compressionStream(compressionStream)
{
}
EStreamError CompressionStreamReader::IsOpen()
{
return this->errored_ ? EStreamError::eErrorEndOfStream : EStreamError::eErrorNone;
}
EStreamError CompressionStreamReader::Read(const Memory::MemoryViewStreamWrite &parameters)
{
auto pair = this->compressionStream->ReadEx(parameters, true);
if (pair == AuStreamReadWrittenPair_t{})
{
this->errored_ = true;
return EStreamError::eErrorEndOfStream;
}
parameters.outVariable = pair.second;
return EStreamError::eErrorNone;
}
void CompressionStreamReader::Close()
{
}
CompressionSeekingReader::CompressionSeekingReader(const AuSPtr<Compression::ICompressionStream> &compressionStream) : compressionStream(compressionStream)
{
}
EStreamError CompressionSeekingReader::IsOpen()
{
return this->errored_ ?
EStreamError::eErrorEndOfStream :
EStreamError::eErrorNone;
}
EStreamError CompressionSeekingReader::ArbitraryRead(AuUInt offset, const Memory::MemoryViewStreamWrite &parameters)
{
if (this->offset != offset)
{
if (this->offset > offset)
{
if (!this->compressionStream->GoBackByProcessedN(this->offset - offset))
{
SysPushErrorIO("Negative compression seek out of bounds");
return EStreamError::eErrorStreamInterrupted;
}
}
else
{
if (!this->compressionStream->GoForwardByProcessedN(offset - this->offset))
{
SysPushErrorIO("Positive compression seek out of bounds");
return EStreamError::eErrorStreamInterrupted;
}
}
this->offset = offset;
}
auto pair = this->compressionStream->ReadEx(parameters, true);
if (pair == AuStreamReadWrittenPair_t {})
{
this->errored_ = true;
return EStreamError::eErrorEndOfStream;
}
this->offset += pair.second;
parameters.outVariable = pair.second;
return EStreamError::eErrorNone;
}
void CompressionSeekingReader::Close()
{
}
AUKN_SYM AuSPtr<IStreamReader> NewCompressionReadAdapter(const AuSPtr<Compression::ICompressionStream> &compresionStream)
{
return AuMakeShared<CompressionStreamReader>(compresionStream);
}
AUKN_SYM AuSPtr<ISeekingReader> NewCompressionSeekingAdapter(const AuSPtr<Compression::ICompressionStream> &compresionStream)
{
return AuMakeShared<CompressionSeekingReader>(compresionStream);
}
}

View File

@ -0,0 +1,38 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOAdapterCompression.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct CompressionStreamReader : IStreamReader
{
AuSPtr<Compression::ICompressionStream> compressionStream;
CompressionStreamReader(const AuSPtr<Compression::ICompressionStream> &compressionStream);
EStreamError IsOpen() override;
EStreamError Read(const Memory::MemoryViewStreamWrite &parameters) override;
void Close() override;
bool errored_ {};
};
struct CompressionSeekingReader : ISeekingReader
{
AuSPtr<Compression::ICompressionStream> compressionStream;
CompressionSeekingReader(const AuSPtr<Compression::ICompressionStream> &compressionStream);
EStreamError IsOpen() override;
EStreamError ArbitraryRead(AuUInt offset, const Memory::MemoryViewStreamWrite &parameters) override;
void Close() override;
bool errored_ {};
AuUInt offset {};
};
}

View File

@ -0,0 +1,46 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOAdapterSeeking.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "IOAdapterSeeking.hpp"
namespace Aurora::IO
{
SeekingReader::SeekingReader(const AuSPtr<ISeekingReader> &reader) : reader(reader)
{
}
EStreamError SeekingReader::IsOpen()
{
return this->reader ? this->reader->IsOpen() : EStreamError::eErrorStreamNotOpen;
}
EStreamError SeekingReader::Read(const Memory::MemoryViewStreamWrite &parameters)
{
auto error = this->reader ? this->reader->ArbitraryRead(this->index, parameters) : EStreamError::eErrorStreamNotOpen;
if (error == EStreamError::eErrorNone)
{
this->index += parameters.outVariable;
}
return error;
}
void SeekingReader::Close()
{
if (this->reader)
{
this->reader->Close();
}
}
AUKN_SYM AuSPtr<IStreamReader> NewSeekingReadAdapter(const AuSPtr<ISeekingReader> &reader)
{
return AuMakeShared<SeekingReader>(reader);
}
}

View File

@ -0,0 +1,23 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOAdapterSeeking.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct SeekingReader : IStreamReader
{
AuSPtr<ISeekingReader> reader;
AuUInt index {};
SeekingReader(const AuSPtr<ISeekingReader> &reader);
virtual EStreamError IsOpen() override;
virtual EStreamError Read(const Memory::MemoryViewStreamWrite &parameters) override;
virtual void Close() override;
};
}

View File

@ -0,0 +1,130 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOBufferedProcessor.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "IOBufferedProcessor.hpp"
namespace Aurora::IO
{
struct IOBufferedProcessor : IIOBufferedProcessor
{
AuSPtr<IStreamReader> source;
AuSPtr<IStreamWriter> drain;
AuSPtr<IIOPipeInterceptor> processor;
AuUInt32 bufferSize {};
AuByteBuffer buffer;
AU_DEFINE_FOR_VA(IOBufferedProcessor,
(AU_DEFINE_CTOR_VA, // initializer-list-like ctor (extending a struct or adding a ctor will break initializer lists)
AU_DEFINE_THIS_MOVE_CTOR_VA, // add move `Object(Object &&)`
AU_DEFINE_EQUALS_VA, // add equals operator
AU_DEFINE_MOVE_VA, // add move assignment operator
AU_DEFINE_COPY_VA), // add copy assignment operator
(source, drain, processor, bufferSize));
AuUInt32 TryProcessBuffered() override;
AuUInt32 GetRawBytesBuffered() override;
AuUInt32 GetRawBytesLimit() override;
AuUInt32 TryPump();
};
AuUInt32 IOBufferedProcessor::TryProcessBuffered()
{
if (this->buffer.IsEmpty())
{
this->buffer.Allocate(this->bufferSize);
this->buffer.flagCircular = true; // !!!
}
if (this->buffer.IsEmpty())
{
return TryPump();
}
AuUInt canBuffer = this->buffer.RemainingWrite();
canBuffer = AuMin(canBuffer, AuUInt((this->buffer.length + this->buffer.base) - this->buffer.writePtr));
AuUInt read {};
try
{
if (this->source->Read(AuMemoryViewStreamWrite(AuMemoryViewWrite(this->buffer.writePtr, canBuffer), read)) !=
AuIO::EStreamError::eErrorNone)
{
return TryPump();
}
}
catch (...)
{
SysPushErrorCatch();
}
this->buffer.writePtr += read;
if (this->buffer.writePtr == this->buffer.base + this->buffer.length)
{
this->buffer.writePtr = this->buffer.base;
}
return TryPump();
}
AuUInt32 IOBufferedProcessor::TryPump()
{
AuUInt bytesProcessedTotal {};
AuUInt bytesProcessed {};
do
{
AuUInt canRead = this->buffer.RemainingBytes();
canRead = AuMin<AuUInt>(canRead, (this->buffer.length + this->buffer.base) - this->buffer.readPtr);
try
{
if (!this->processor->OnDataAvailable(AuMemoryViewStreamRead(AuMemoryViewRead(this->buffer.readPtr, canRead), bytesProcessed), this->drain))
{
break;
}
}
catch (...)
{
SysPushErrorCatch();
}
this->buffer.readPtr += bytesProcessed;
bytesProcessedTotal += bytesProcessed;
if (this->buffer.readPtr == this->buffer.base + this->buffer.length)
{
this->buffer.readPtr = this->buffer.base;
}
}
while (AuExchange(bytesProcessed, 0));
return bytesProcessedTotal;
}
AuUInt32 IOBufferedProcessor::GetRawBytesBuffered()
{
return this->buffer.RemainingBytes();
}
AuUInt32 IOBufferedProcessor::GetRawBytesLimit()
{
return this->bufferSize;
}
AUKN_SYM AuSPtr<IIOBufferedProcessor> NewBufferedProcessor(const AuSPtr<IStreamReader> &source,
const AuSPtr<IIOPipeInterceptor> &processor,
const AuSPtr<IStreamWriter> &drain,
AuUInt32 bufferSize)
{
return AuMakeShared<IOBufferedProcessor>(source, drain, processor, bufferSize);
}
}

View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: BufferedProcessor.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
}

View File

@ -0,0 +1,15 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOPipeProcessor.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "IOPipeProcessor.hpp"
namespace Aurora::IO
{
}

View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOPipeProcessor.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
}

763
Source/IO/IOProcessor.cpp Normal file
View File

@ -0,0 +1,763 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOProcessor.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "IOProcessorItem.hpp"
#include "IOProcessorItems.hpp"
#include "IOProcessor.hpp"
namespace Aurora::IO
{
IOProcessor::IOProcessor(AuUInt threadId, bool tickOnly,
AuAsync::WorkerPId_t worker,
const AuSPtr<Loop::ILoopQueue> &loop) : mutliplexIOAndTimer(!tickOnly), loopQueue(loop), asyncWorker(worker), threadId(threadId)
{
}
IOProcessor::~IOProcessor()
{
this->ReleaseAllWatches();
}
bool IOProcessor::Init()
{
if (!this->items.Init())
{
SysPushErrorNested();
return false;
}
if (!this->timers.Init(this, AuLoop::NewLSTimer(0)))
{
SysPushErrorNested();
return false;
}
return true;
}
bool IOProcessor::QueueIOEvent(const AuSPtr<IIOProcessorItem> &ioEvent)
{
return this->items.AddFrameOrFallback(AuStaticCast<IOProcessorItem>(ioEvent));
}
AuUInt32 IOProcessor::TryTick()
{
if (!CheckThread())
{
SysPushErrorGeneric("Wrong Thread");
return 0;
}
AU_LOCK_GUARD(this->items.mutex);
FrameStart();
FrameRunThreadIO();
FrameRunCheckLSes();
bool bHasAlertedItems = this->items.workSignaled.size() ||
this->timers.nbTicker.HasPassed();
if (!bHasAlertedItems)
{
ReportState(EIOProcessorEventStage::eFrameEndOfFrame);
return 0;
}
return FrameRunEpilogue();
}
bool IOProcessor::TickForRegister(const AuSPtr<IIOProcessorItem> &ioEvent)
{
return this->items.AddFrameOrFallback(AuStaticCast<IOProcessorItem>(ioEvent));
}
AuUInt32 IOProcessor::TickFor(const AuSPtr<IIOProcessorItem> &ioEvent)
{
SysAssert(TickForRegister(ioEvent));
if (IsTickOnly())
{
return 0;
}
if (this->bFrameStart)
{
return 0;
}
if (!CheckThread())
{
SysPushErrorGeneric("Wrong Thread");
return 0;
}
AU_LOCK_GUARD(this->items.mutex);
FrameStart();
FrameRunThreadIO();
FrameRunCheckLSes();
return FrameRunEpilogue();
}
AuUInt32 IOProcessor::RunTick()
{
if (!CheckThread())
{
SysPushErrorGeneric("Wrong Thread");
AuThreading::Sleep(1);
return 0;
}
this->bFrameStart = false;
AU_LOCK_GUARD(this->items.mutex);
FrameStart();
FrameWaitForAny(0);
FrameRunThreadIO();
FrameRunCheckLSes();
return FrameRunEpilogue();
}
AuUInt32 IOProcessor::ManualTick()
{
if (!CheckThread())
{
SysPushErrorGeneric("Wrong Thread");
return 0;
}
this->bFrameStart = false;
AU_LOCK_GUARD(this->items.mutex);
FrameStart();
FrameRunThreadIO();
FrameRunCheckLSes();
return FrameRunEpilogue();
}
AuUInt IOProcessor::FrameRunEpilogue()
{
FrameRunAlerted();
FrameRunAlertedSniffers();
FrameEndOfFrameEvents();
return FrameFinalize();
}
void IOProcessor::FrameRunCheckLSes()
{
if (!this->IsTickOnly())
{
return;
}
for (auto &item : this->items.allItems)
{
if (item->item->IsRunOnSelfIO() && item->item->IsRunOnSelfIOCheckedOnTimerTick())
{
auto ls = item->item->GetSelfIOSource();
if (ls && ls->IsSignaled())
{
// Should deadlock without critical sections
this->QueueIOEvent(item);
}
}
}
}
void IOProcessor::FrameRunAlerted()
{
for (auto &a : this->items.onTickReceivers)
{
if (a->listener)
{
try
{
a->listener->Tick_RunOnTick();
if (!AuExists(this->items.workSignaled, a) &&
!AuExists(this->items.onTickReceivers, a))
{
a->listener->Tick_Any();
}
}
catch (...)
{
SysPushErrorCatch();
}
}
SysAssert(AuTryInsert(this->items.finalizeQueue, a));
}
for (auto &a : this->items.workSignaled)
{
if (a->listener)
{
try
{
a->listener->Tick_SelfIOEvent();
a->listener->Tick_Any();
}
catch (...)
{
SysPushErrorCatch();
}
}
SysAssert(AuTryInsert(this->items.finalizeQueue, a));
}
}
void IOProcessor::FrameRunAlertedSniffers()
{
if (this->items.workSignaled.empty())
{
return;
}
for (auto &a : this->items.onOtherReceivers)
{
if (AuExists(this->items.workSignaled, a))
{
continue;
}
if (a->listener)
{
try
{
a->listener->Tick_OtherIOEvent();
a->listener->Tick_Any();
}
catch (...)
{
SysPushErrorCatch();
SysAssert(a->FailWatch());
}
}
this->items.finalizeQueue.push_back(a);
}
}
void IOProcessor::FrameEndOfFrameEvents()
{
ReportState(EIOProcessorEventStage::eFrameWorkDone);
for (auto &pumped : this->items.finalizeQueue)
{
if (!pumped->listener)
{
continue;
}
try
{
pumped->listener->Tick_FrameEpilogue();
}
catch (...)
{
SysPushErrorCatch();
pumped->FailWatch();
}
}
for (auto itr = this->items.crossThreadAbort.begin(); itr != this->items.crossThreadAbort.end(); )
{
bool fatal = itr->second;
auto item = itr->first;
this->ClearProcessor(item, fatal);
itr = this->items.crossThreadAbort.erase(itr);
}
}
void IOProcessor::ClearProcessor(const AuSPtr<IOProcessorItem> &processor, bool fatal)
{
if (!AuTryRemove(this->items.allItems, processor))
{
return;
}
try
{
if (fatal)
{
processor->listener->OnFailureCompletion();
}
else
{
processor->listener->OnNominalCompletion();
}
}
catch (...)
{
SysPushErrorCatch();
}
auto item = processor->item;
if (item->IsRunOnTick())
{
if (!AuTryRemove(this->items.onTickReceivers, processor))
{
SysPushErrorMem();
}
}
if (item->IsRunOnOtherTick())
{
if (!AuTryRemove(this->items.onOtherReceivers, processor))
{
SysPushErrorMem();
}
}
if (item->IsRunOnSelfIO())
{
if (!AuTryRemove(this->items.registeredIO, processor))
{
SysPushErrorMem();
}
auto src = item->GetSelfIOSource();
if (!src)
{
SysPushErrorGeneric();
return;
}
if (!this->ToQueue()->SourceRemove(src))
{
SysPushErrorNested("IO Remove Error [!!!]");
}
}
}
AuUInt IOProcessor::FrameFinalize()
{
auto C = this->items.finalizeQueue.size();
this->items.workSignaled.clear();
this->items.finalizeQueue.clear();
ReportState(EIOProcessorEventStage::eFrameEndOfFrame);
this->bFrameStart = false;
return C;
}
bool IOProcessor::InternalIsTickReady()
{
// TODO: tickless io ad-hoc event-as-condvar?
return this->items.workSignaled.size() ||
this->items.crossThreadAbort.size();
}
bool IOProcessor::FrameWaitForAny(AuUInt32 msMax)
{
if (InternalIsTickReady())
{
return true;
}
if (this->IsTickOnly())
{
auto next = this->timers.nbTicker.nextTriggerTime;
auto now = AuTime::CurrentInternalClockNS();
if (now >= next)
{
return true;
}
auto delay = AuNSToMS<AuUInt>(next - now);
#if 0
if (delay == 1)
{
bool ret {};
while (IO::IOYield());
}
else
#endif
{
return IO::WaitFor(msMax ? AuMin<AuUInt32>(msMax, delay) : delay, true);
}
}
else
{
return this->loopQueue->WaitAny(msMax);
}
}
bool IOProcessor::AddEventListener(const AuSPtr<IIOProcessorEventListener> &eventListener)
{
AU_LOCK_GUARD(this->listenersSpinLock);
return AuTryInsert(this->listeners, eventListener);
}
void IOProcessor::RemoveEventListener(const AuSPtr<IIOProcessorEventListener> &eventListener)
{
AU_LOCK_GUARD(this->listenersSpinLock);
AuTryRemove(this->listeners, eventListener);
}
void IOProcessor::DispatchFrame(ProcessInfo &info)
{
this->ManualTick();
auto ns = this->refreshRateNs;
if (ns)
{
info = AuAsync::ETickType::eSchedule;
info.reschedNs = ns;
}
}
void IOProcessor::FrameStart()
{
ReportState(EIOProcessorEventStage::eFrameStartOfFrame);
this->bFrameStart = true;
auto blocked = this->items.GetBlockedSignals();
if (blocked.size())
{
AU_LOCK_GUARD(this->items.mutex);
this->items.workSignaled.insert(this->items.workSignaled.end(), blocked.begin(), blocked.end());
}
}
void IOProcessor::FrameRunThreadIO()
{
ReportState(EIOProcessorEventStage::eFrameYieldIO);
while (IO::IOYield());
ReportState(EIOProcessorEventStage::eFrameIODone);
}
AuUInt64 IOProcessor::SetRefreshRate(AuUInt64 ns)
{
if (!CheckThread())
{
AU_THROW_STRING("Wrong Thread");
}
bool changed = bool(ns) != bool(this->refreshRateNs);
auto old = AuExchange(this->refreshRateNs, ns);
UpdateTimers();
if (changed)
{
if (ns)
{
AddTimer();
}
else
{
RemoveTimer();
}
}
return old;
}
bool IOProcessor::HasRefreshRate()
{
return bool(this->refreshRateNs);
}
bool IOProcessor::MultiplexRefreshRateWithIOEvents()
{
return this->mutliplexIOAndTimer;
}
AuUInt64 IOProcessor::GetOwnedThreadId()
{
return this->threadId;
}
void IOProcessor::UpdateTimers()
{
this->timers.lsTicker->UpdateTickRateIfAnyNs(this->refreshRateNs);
this->timers.nbTicker.nsTimeStep = this->refreshRateNs;
if (!this->timers.nbTicker.nextTriggerTime)
{
this->timers.nbTicker.nextTriggerTime = AuTime::CurrentInternalClockNS() + this->timers.nbTicker.nsTimeStep;
}
}
void IOProcessor::AddTimer()
{
if (IsAsync())
{
StartAsyncTimerIfAny();
}
else
{
AddTimerLS();
}
}
void IOProcessor::AddTimerLS()
{
this->ToQueue()->SourceAdd(this->timers.lsTicker);
this->ToQueue()->AddCallback(this->timers.lsTicker, AuSPtr<AuLoop::ILoopSourceSubscriber>(AuSharedFromThis(), &this->timers));
this->ToQueue()->Commit();
}
void IOProcessor::StartAsyncTimerIfAny()
{
if (!this->workItem)
{
this->workItem = this->asyncWorker.pool->NewWorkItem(this->asyncWorker, AuSharedFromThis());
}
this->workItem->SetSchedTimeNs(this->refreshRateNs);
this->workItem->Dispatch();
}
void IOProcessor::RemoveTimer()
{
if (IsAsync())
{
CancelWorkItem();
}
else
{
RemoveLSTimer();
}
}
void IOProcessor::CancelWorkItem()
{
this->workItem->Cancel();
this->workItem = {};
}
void IOProcessor::RemoveLSTimer()
{
this->ToQueue()->SourceRemove(this->timers.lsTicker);
}
bool IOProcessor::IsAsync()
{
return bool(this->asyncWorker.pool);
}
bool IOProcessor::IsTickOnly()
{
return !this->mutliplexIOAndTimer &&
this->timers.nbTicker.nextTriggerTime;
}
bool IOProcessor::RequestRemovalForItemFromAnyThread(const AuSPtr<IIOProcessorItem> &processor)
{
return {};
}
AuSPtr<IIOProcessorItem> IOProcessor::StartIOWatch(const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOEventListener> &listener)
{
if (!CheckThread())
{
AU_THROW_STRING("Wrong Thread");
}
auto item = AuMakeShared<IOProcessorItem>();
item->parent = this;
item->listener = listener;
item->item = object;
AU_LOCK_GUARD(this->items.mutex);
this->items.allItems.push_back(item);
if (object->IsRunOnTick())
{
if (!AuTryInsert(this->items.onTickReceivers, item))
{
SysPushErrorMem();
return {};
}
}
if (object->IsRunOnOtherTick())
{
if (!AuTryInsert(this->items.onOtherReceivers, item))
{
SysPushErrorMem();
return {};
}
}
if (object->IsRunOnSelfIO())
{
auto src = object->GetSelfIOSource();
if (!src)
{
SysPushErrorGeneric();
return {};
}
if (!this->ToQueue()->SourceAdd(src))
{
SysPushErrorNested("IO Error");
return {};
}
if (!this->ToQueue()->AddCallback(src, item))
{
SysPushErrorNested("IO Error");
SysAssert(this->ToQueue()->SourceRemove(src));
return {};
}
if (!this->ToQueue()->Commit())
{
SysPushErrorIO("midframe recommit");
}
if (!AuTryInsert(this->items.registeredIO, item))
{
SysPushErrorMem();
SysAssert(this->ToQueue()->SourceRemove(src));
return {};
}
}
return item;
}
AuSPtr<IIOPipeProcessor> IOProcessor::ToPipeProcessor()
{
if (!CheckThread())
{
AU_THROW_STRING("Wrong Thread");
}
return {};
}
AuSPtr<Loop::ILoopQueue> IOProcessor::ToQueue()
{
return this->loopQueue;
}
void IOProcessor::ReleaseAllWatches()
{
}
bool IOProcessor::HasItems()
{
return bool(this->items.allItems.size());
}
void IOProcessor::ReportState(EIOProcessorEventStage stage)
{
AU_LOCK_GUARD(this->listenersSpinLock);
for (auto &listeners : this->listeners)
{
try
{
listeners->OnIOProcessorStateEvent(stage);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
bool IOProcessor::CheckThread()
{
if (this->threadId == 0)
{
this->threadId = AuThreads::GetThreadId();
return true;
}
return this->threadId == AuThreads::GetThreadId();
}
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessor(bool tickOnly, const AuSPtr<Loop::ILoopQueue> &queue)
{
auto processor = AuMakeShared<IOProcessor>(0, tickOnly, AuAsync::WorkerPId_t {}, queue);
if (!processor)
{
SysPushErrorMem();
return {};
}
if (!processor->Init())
{
SysPushErrorNested();
return {};
}
return processor;
}
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessorOnThread(bool tickOnly, Async::WorkerPId_t id)
{
if (!id.pool)
{
return {};
}
auto thread = id.pool->ResolveHandle(id);
if (!thread)
{
SysPushErrorGeneric("Worker PID failed to resolve to a thread object");
return {};
}
auto queue = id.pool->ToKernelWorkQueue(id);
if (!queue)
{
SysPushErrorGeneric("Worker PID has no kernel work queue");
return {};
}
auto processor = AuMakeShared<IOProcessor>(AuUInt(thread.get()), tickOnly, id, queue);
if (!processor)
{
SysPushErrorMem();
return {};
}
if (!processor->Init())
{
SysPushErrorNested();
return {};
}
return processor;
}
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessorNoQueue(bool tickOnly)
{
auto loop = AuLoop::NewLoopQueue();
if (!loop)
{
SysPushErrorMem();
return {};
}
return NewIOProcessor(tickOnly, loop);
}
}

107
Source/IO/IOProcessor.hpp Normal file
View File

@ -0,0 +1,107 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOProcessor.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
#include "IOProcessorItems.hpp"
#include "IOProcessorTimers.hpp"
namespace Aurora::IO
{
struct IOProcessor : IIOProcessor, AuEnableSharedFromThis<IOProcessor>, AuAsync::IWorkItemHandler
{
~IOProcessor();
IOProcessor(AuUInt threadId, bool tickOnly, AuAsync::WorkerPId_t worker, const AuSPtr<Loop::ILoopQueue> &loop);
bool Init();
bool QueueIOEvent(const AuSPtr<IIOProcessorItem> &ioEvent);
AuUInt32 TryTick() override;
AuUInt32 RunTick() override;
AuUInt32 TickFor(const AuSPtr<IIOProcessorItem> &ioEvent);
bool TickForRegister(const AuSPtr<IIOProcessorItem> &ioEvent);
AuUInt32 ManualTick() override;
void DispatchFrame(ProcessInfo &info) override;
bool AddEventListener(const AuSPtr<IIOProcessorEventListener> &eventListener) override;
void RemoveEventListener(const AuSPtr<IIOProcessorEventListener> &eventListener) override;
void FrameStart();
bool FrameWaitForAny(AuUInt32 msMax);
AuUInt FrameRunEpilogue();
void FrameRunThreadIO();
void FrameRunCheckLSes();
void FrameRunAlerted();
void FrameRunAlertedSniffers();
void FrameEndOfFrameEvents();
AuUInt FrameFinalize();
bool InternalIsTickReady();
void ClearProcessor(const AuSPtr<IOProcessorItem> &processor, bool fatal);
void ReportState(EIOProcessorEventStage stage);
AuUInt64 SetRefreshRate(AuUInt64 ns) override;
bool HasRefreshRate() override;
AuUInt64 GetOwnedThreadId() override;
bool MultiplexRefreshRateWithIOEvents() override;
AuSPtr<IIOProcessorItem> StartIOWatch(const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOEventListener> &listener) override;
AuSPtr<IIOPipeProcessor> ToPipeProcessor() override;
AuSPtr<Loop::ILoopQueue> ToQueue() override;
void ReleaseAllWatches() override;
bool HasItems() override;
bool CheckThread();
bool RequestRemovalForItemFromAnyThread(const AuSPtr<IIOProcessorItem> &processor);
AuSPtr<Loop::ILoopQueue> loopQueue;
AuUInt threadId;
AuUInt64 refreshRateNs {};
AuUInt64 minFrameDeltaNs {};
void UpdateTimers();
void AddTimer();
void AddTimerLS();
void StartAsyncTimerIfAny();
void RemoveTimer();
void CancelWorkItem();
void RemoveLSTimer();
bool IsTickOnly();
bool IsAsync();
bool mutliplexIOAndTimer {true};
IOProcessorItems items;
IOProcessorTimers timers;
AuAsync::WorkerPId_t asyncWorker;
AuSPtr<AuAsync::IWorkItem> workItem;
AuThreadPrimitives::MutexUnique_t mutex;
AuThreadPrimitives::SpinLock listenersSpinLock;
AuList<AuSPtr<IIOProcessorEventListener>> listeners;
bool bFrameStart {};
};
}

View File

@ -0,0 +1,76 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOProcessorItem.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "IOProcessorItem.hpp"
#include "IOProcessor.hpp"
namespace Aurora::IO
{
bool IOProcessorItem::StopWatch()
{
if (this->parent->CheckThread() &&
this->parent->bFrameStart)
{
this->parent->ClearProcessor(AuSharedFromThis(), false);
return false;
}
return this->parent->items.ScheduleFinish(AuSharedFromThis(), false);
}
bool IOProcessorItem::FailWatch()
{
if (this->parent->CheckThread() &&
this->parent->bFrameStart)
{
this->parent->ClearProcessor(AuSharedFromThis(), true);
return false;
}
return this->parent->items.ScheduleFinish(AuSharedFromThis(), true);
}
bool IOProcessorItem::OnFinished(const AuSPtr<AuLoop::ILoopSource> &source)
{
IOAlert(false);
return false;
}
void IOProcessorItem::InvokeManualTick()
{
IOAlert(true);
}
void IOProcessorItem::IOAlert(bool force)
{
if (!this->parent->IsTickOnly())
{
// We *should* be mid-yield in the frame
// We register the item for the later stages of the IO frame
this->parent->TickForRegister(AuSharedFromThis());
}
else if (force) // manual tick
{
if (this->parent->CheckThread())
{
this->parent->TickFor(AuSharedFromThis());
this->parent->ManualTick();
}
else if (parent->IsAsync())
{
// TODO:
}
else
{
SysPanic("Missing");
}
}
}
}

View File

@ -0,0 +1,35 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOProcessorItem.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IOProcessor;
struct IOProcessorItem : AuLoop::ILoopSourceSubscriber, IIOProcessorItem, AuEnableSharedFromThis<IOProcessorItem>
{
IOProcessor *parent;
AuSPtr<IIOWaitableItem> item;
AuSPtr<IIOEventListener> listener;
// ILoopSourceSubscriber
bool OnFinished(const AuSPtr<AuLoop::ILoopSource> &source) override;
// IIOProcessorItem
virtual bool StopWatch() override;
virtual bool FailWatch() override;
// IIOProcessorItem::...
void InvokeManualTick() override;
//
void IOAlert(bool force);
};
}

View File

@ -0,0 +1,51 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOProcessorItems.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "IOProcessorItem.hpp"
#include "IOProcessorItems.hpp"
namespace Aurora::IO
{
bool IOProcessorItems::Init()
{
this->mutex = AuThreadPrimitives::CriticalSectionUnique();
this->mutex2 = AuThreadPrimitives::CriticalSectionUnique();
return bool(this->mutex) && bool(this->mutex2);
}
bool IOProcessorItems::AddFrameTemp(const AuSPtr<IOProcessorItem> &item)
{
AU_TRY_LOCK_GUARD_RET_DEF(this->mutex);
return AuTryInsert(this->workSignaled, item);
}
bool IOProcessorItems::AddFrameOrFallback(const AuSPtr<IOProcessorItem> &item)
{
if (!AddFrameTemp(item))
{
AU_LOCK_GUARD(this->mutex2);
return AuTryInsert(this->workSignaled2, item);
}
return true;
}
bool IOProcessorItems::ScheduleFinish(const AuSPtr<IOProcessorItem> &item, bool unsafe)
{
AU_TRY_LOCK_GUARD_RET_DEF(this->mutex);
return AuTryInsert(this->crossThreadAbort, AuMakePair(item, unsafe));
}
AuList<AuSPtr<IOProcessorItem>> IOProcessorItems::GetBlockedSignals()
{
AU_LOCK_GUARD(this->mutex2);
return AuExchange(this->workSignaled2, {});
}
}

View File

@ -0,0 +1,40 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOProcessorItems.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IOProcessor;
struct IOProcessorItem;
struct IOProcessorItems
{
AuList<AuSPtr<IOProcessorItem>> allItems;
AuList<AuSPtr<IOProcessorItem>> onTickReceivers;
AuList<AuSPtr<IOProcessorItem>> onOtherReceivers;
AuList<AuSPtr<IOProcessorItem>> registeredIO;
AuThreadPrimitives::CriticalSectionUnique_t mutex;
AuList<AuSPtr<IOProcessorItem>> workSignaled;
AuThreadPrimitives::CriticalSectionUnique_t mutex2;
AuList<AuSPtr<IOProcessorItem>> workSignaled2;
AuList<AuSPtr<IOProcessorItem>> finalizeQueue;
AuList<AuPair<AuSPtr<IOProcessorItem>, bool>> crossThreadAbort;
bool Init();
bool AddFrameTemp(const AuSPtr<IOProcessorItem> &item);
bool AddFrameOrFallback(const AuSPtr<IOProcessorItem> &item);
bool ScheduleFinish(const AuSPtr<IOProcessorItem> &item, bool unsafe);
AuList<AuSPtr<IOProcessorItem>> GetBlockedSignals();
};
}

View File

@ -0,0 +1,27 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOProcessorTimers.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "IOProcessorTimers.hpp"
#include "IOProcessor.hpp"
namespace Aurora::IO
{
bool IOProcessorTimers::Init(IOProcessor *parent, AuSPtr<Loop::ITimer> lsTicker)
{
this->parent = parent;
this->lsTicker = lsTicker;
return bool(this->parent) && bool(lsTicker);
}
bool IOProcessorTimers::OnFinished(const AuSPtr<AuLoop::ILoopSource> &source)
{
this->parent->ManualTick();
return false;
}
}

View File

@ -0,0 +1,24 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOProcessorTimers.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IOProcessor;
struct IOProcessorTimers : AuLoop::ILoopSourceSubscriber
{
IOProcessor *parent {};
AuSPtr<Loop::ITimer> lsTicker;
Utility::RateLimiter nbTicker;
bool Init(IOProcessor *parent, AuSPtr<Loop::ITimer> lsTicker);
bool OnFinished(const AuSPtr<AuLoop::ILoopSource> &source) override;
};
}

View File

@ -41,7 +41,7 @@ namespace Aurora::IO
sleep = 0;
}
if (!UNIX::LinuxOverlappedPoll(sleep, true))
if (!UNIX::LinuxOverlappedPoll(sleep))
{
continue;
}

View File

@ -58,4 +58,9 @@ namespace Aurora::IO
return bHit;
}
AUKN_SYM bool IOYield()
{
return SleepEx(0, true) == WAIT_IO_COMPLETION;
}
}

View File

@ -0,0 +1,73 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOWaitableIOLoopSource.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "IOWaitableIOLoopSource.hpp"
namespace Aurora::IO
{
IOWatachableIOLoopSource::IOWatachableIOLoopSource(const AuSPtr<Loop::ILoopSource> &source) : source(source)
{
}
bool IOWatachableIOLoopSource::IsRunOnOtherTick()
{
return {};
}
bool IOWatachableIOLoopSource::IsRunOnTick()
{
return {};
}
bool IOWatachableIOLoopSource::CanRequestTick()
{
return {};
}
void IOWatachableIOLoopSource::OnReportPumper(const AuSPtr<IIOProcessorManualInvoker> &iface)
{
}
bool IOWatachableIOLoopSource::IsRunOnSelfIO()
{
return true;
}
AuSPtr<Loop::ILoopSource> IOWatachableIOLoopSource::GetSelfIOSource()
{
return GetSelfIOSource();
}
bool IOWatachableIOLoopSource::ApplyRateLimit()
{
return {};
}
AuSPtr<Loop::ILoopSource> IOWatachableIOLoopSource::GetLoopSource()
{
return this->source;
}
AuSPtr<Loop::ILoopSource> IOWatachableIOLoopSource::SetLoopSource(const AuSPtr<Loop::ILoopSource> &ls)
{
return AuExchange(this->source, ls);
}
bool IOWatachableIOLoopSource::IsRunOnSelfIOCheckedOnTimerTick()
{
return true;
}
AUKN_SYM AuSPtr<IIOWatachableIOLoopSource> NewWaitableLoopSource(const AuSPtr<Loop::ILoopSource> &ptr)
{
return AuMakeShared<IOWatachableIOLoopSource>(ptr);
}
}

View File

@ -0,0 +1,33 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOWaitableIOLoopSource.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IOWatachableIOLoopSource : IIOWatachableIOLoopSource
{
IOWatachableIOLoopSource(const AuSPtr<Loop::ILoopSource> &source);
bool IsRunOnOtherTick() override;
bool IsRunOnTick() override;
bool CanRequestTick() override;
void OnReportPumper(const AuSPtr<IIOProcessorManualInvoker> &iface) override;
bool IsRunOnSelfIO() override;
AuSPtr<Loop::ILoopSource> GetSelfIOSource() override;
bool ApplyRateLimit() override;
AuSPtr<Loop::ILoopSource> GetLoopSource() override;
AuSPtr<Loop::ILoopSource> SetLoopSource(const AuSPtr<Loop::ILoopSource> &ls) override;
bool IsRunOnSelfIOCheckedOnTimerTick() override;
AuSPtr<Loop::ILoopSource> source;
};
}

View File

@ -0,0 +1,97 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOWaitableIOTimer.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "IOWaitableIOTimer.hpp"
namespace Aurora::IO
{
IOWaitableIOTimer::IOWaitableIOTimer()
{
}
bool IOWaitableIOTimer::IsValid()
{
return bool(this->source);
}
bool IOWaitableIOTimer::IsRunOnOtherTick()
{
return {};
}
bool IOWaitableIOTimer::IsRunOnTick()
{
return {};
}
bool IOWaitableIOTimer::CanRequestTick()
{
return {};
}
void IOWaitableIOTimer::OnReportPumper(const AuSPtr<IIOProcessorManualInvoker> &iface)
{
}
bool IOWaitableIOTimer::IsRunOnSelfIO()
{
return true;
}
AuSPtr<Loop::ILoopSource> IOWaitableIOTimer::GetSelfIOSource()
{
return this->source;
}
bool IOWaitableIOTimer::IsRunOnSelfIOCheckedOnTimerTick()
{
return true;
}
bool IOWaitableIOTimer::ApplyRateLimit()
{
return false;
}
AuUInt64 IOWaitableIOTimer::SetConstantTick(AuUInt64 ns)
{
auto old = AuExchange(this->constantTickNs, ns);
this->source->UpdateTickRateIfAnyNs(ns);
return old;
}
AuUInt64 IOWaitableIOTimer::SetTargetTimeAbs(AuUInt64 ns)
{
auto old = AuExchange(this->targetTimeAbsNs, ns);
this->source->UpdateTimeNs(ns);
return old;
}
AUKN_SYM AuSPtr<IIOWaitableIOTimer> NewWaitableIOTimer()
{
auto timer = AuMakeShared<IOWaitableIOTimer>();
if (!timer)
{
SysPushErrorMem();
return {};
}
timer->source = AuLoop::NewLSTimer(0);
if (!timer->IsValid())
{
SysPushErrorMem();
return {};
}
return timer;
}
}

View File

@ -0,0 +1,37 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOWaitableIOTimer.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IOWaitableIOTimer : IIOWaitableIOTimer
{
IOWaitableIOTimer();
bool IsValid();
bool IsRunOnOtherTick() override;
bool IsRunOnTick() override;
bool CanRequestTick() override;
void OnReportPumper(const AuSPtr<IIOProcessorManualInvoker> &iface) override;
bool IsRunOnSelfIO() override;
AuSPtr<Loop::ILoopSource> GetSelfIOSource() override;
bool IsRunOnSelfIOCheckedOnTimerTick() override;
bool ApplyRateLimit() override;
AuUInt64 SetConstantTick(AuUInt64 ns) override;
AuUInt64 SetTargetTimeAbs(AuUInt64 ns) override;
AuSPtr<Loop::ITimer> source;
AuUInt64 constantTickNs {};
AuUInt64 targetTimeAbsNs {};
};
}

View File

@ -0,0 +1,15 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOWaitableTickLimiter.cpp
Date: 2022-6-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "IOWaitableTickLimiter.hpp"
namespace Aurora::IO
{
}

View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOWaitableTickLimiter.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
}

View File

@ -52,6 +52,7 @@ namespace Aurora::Loop
this->reschedStepNsOrZero_ = AuMSToNS<AuUInt64>(reschedStepMsOrZero);
this->maxIterationsOrZero_ = maxIterationsOrZero;
this->count_ = 0;
UpdateTimeInternal(this->targetTime_);
}
void LSTimer::UpdateTickRateIfAnyNs(AuUInt64 reschedStepNsOrZero, AuUInt32 maxIterationsOrZero)
@ -59,6 +60,7 @@ namespace Aurora::Loop
this->reschedStepNsOrZero_ = reschedStepNsOrZero;
this->maxIterationsOrZero_ = maxIterationsOrZero;
this->count_ = 0;
UpdateTimeInternal(this->targetTime_);
}
bool LSTimer::OnTrigger(AuUInt handle)

View File

@ -57,6 +57,8 @@
#endif
#define GIMME_IOWAITABLEITEM
#include <AuroraRuntime.hpp>
inline Aurora::RuntimeStartInfo gRuntimeConfig;