[+] IIOSimpleEventListener

[+] IOProcessor::StartSimpleIOWatch(const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOSimpleEventListener> &listener)
[+] IOProcessor::StartSimpleLSWatch(const AuSPtr<Loop::ILoopSource> &source, const AuSPtr<IIOSimpleEventListener> &listener)
[*] IOAdapterAsyncStream should reset the transactions IO state upon reaching end of segment (should this be per tick?) or upon stream error - otherwise, we end up spinning on a stuck event forever
[*] Fix non-linear path under read of the AuByteBuffer
[*] Fix various other nonlinear conditions under AuByteBuffer
[*] IOProcessor releases registered io item from queue upon request
[*] Fix ConsoleMessage::Write -> enumeration of color should be casted to a uint8
[+] Error telemetry under async task creation
[*] Fix various lock ups and non-blocking spins related to erroneous InternalRunOne impl. Residual preemptive batching was fucking with modern io.
[*] Cleanup TaskFrom/JobFrom. More work required to clean up legacy piss
This commit is contained in:
Reece Wilson 2022-06-22 14:42:17 +01:00
parent 7ea75b4014
commit 0c3344fe46
20 changed files with 214 additions and 39 deletions

View File

@ -21,7 +21,7 @@ namespace Aurora::Async
}
template<class Info_t = AVoid, class Result_t = AVoid, class Callable_t>
static inline FJob<Info_t, Result_t> JobFromResultConsumer(/*AuConsumer<const Result_t &> */ Callable_t&&onSuccess)
static inline FJob<Info_t, Result_t> JobFromResultConsumer(/*AuConsumer<const Result_t &> */ Callable_t onSuccess)
{
FJob<Info_t, Result_t> ret;
ret.onSuccess = [=](const Info_t &in, const Result_t &a)
@ -32,7 +32,7 @@ namespace Aurora::Async
}
template<typename Out_t = AVoid, typename ... Args, typename Callable_t>
FJob<AuTuple<Args...>, Out_t> JobFromTupleConsumer(/*AuConsumer<Args..., const Result_t &> */ Callable_t &&onSuccess)
FJob<AuTuple<Args...>, Out_t> JobFromTupleConsumer(/*AuConsumer<Args..., const Result_t &> */ Callable_t onSuccess)
{
FJob<AuTuple<Args...>, Out_t> ret;
ret.onSuccess = [=](const AuTuple<Args...> &in, const Out_t &a)
@ -43,7 +43,7 @@ namespace Aurora::Async
}
template<typename Out_t = AVoid, typename ... Args, typename Callable_t, typename FailureCallable_t>
FJob<AuTuple<Args...>, Out_t> JobFromTupleConsumerEx(/*AuConsumer<Args..., const Result_t &> */ Callable_t &&onSuccess, FailureCallable_t &&onFailure)
FJob<AuTuple<Args...>, Out_t> JobFromTupleConsumerEx(/*AuConsumer<Args..., const Result_t &> */ Callable_t onSuccess, FailureCallable_t onFailure)
{
FJob<AuTuple<Args...>, Out_t> ret;
ret.onSuccess = [=](const AuTuple<Args...> &in, const Out_t &a)

View File

@ -10,37 +10,37 @@
namespace Aurora::Async
{
template<typename Info_t = AVoid, typename Out_t = AVoid, class ClazzImpl>
FTask<Info_t, Out_t> TaskFromConsumerRefT(ClazzImpl &&func)
FTask<Info_t, Out_t> TaskFromConsumerRefT(ClazzImpl func)
{
FTask<Info_t, Out_t> ret;
ret.onFrame = [callable = func](const Info_t &in) -> Out_t
ret.onFrame = [=](const Info_t &in) -> Out_t
{
if constexpr (AuIsSame_v<Out_t, AVoid>)
{
callable(in);
func(in);
return {};
}
else
{
return callable(in);
return func(in);
}
};
return ret;
}
template<typename Out_t = AVoid, typename ... Args, typename Functor>
FTask<AuTuple<Args...>, Out_t> TaskFromTupleCallable(Functor &&func)
FTask<AuTuple<Args...>, Out_t> TaskFromTupleCallable(Functor func)
{
FTask<AuTuple<Args...>, Out_t> ret;
ret.onFrame = [callable = func](const AuTuple<Args...> &in) -> Out_t
ret.onFrame = [=](const AuTuple<Args...> &in) -> Out_t
{
return AuTupleApply(callable, in);
return AuTupleApply(func, in);
};
return ret;
}
template<typename Out_t = AVoid, typename Owner_t, typename ... Args>
FTask<AuTuple<Args...>, Out_t> TaskFromTupleCallableWithOwnerArg(AuFunction<Out_t(Args...)> &&func, const Owner_t &ownerToPin)
FTask<AuTuple<Args...>, Out_t> TaskFromTupleCallableWithOwnerArg(AuFunction<Out_t(Args...)> func, const Owner_t &ownerToPin)
{
FTask<AuTuple<Owner_t, Args...>, Out_t> ret;
ret.onFrame = [ownerToPin, callable = func](const AuTuple<Args...> &in) -> Out_t
@ -51,12 +51,12 @@ namespace Aurora::Async
}
template<typename Task_t, typename ReturnValue_t, typename ... Args, typename Functor>
Task_t TaskFromTupleCallableWithBindOwner(Functor &&func)
Task_t TaskFromTupleCallableWithBindOwner(Functor func)
{
Task_t ret;
ret.onFrame = [callable = func](const auto &in) -> ReturnValue_t
ret.onFrame = [=](const auto &in) -> ReturnValue_t
{
return AuTupleApply(callable, AuTuplePopFront(in));
return AuTupleApply(func, AuTuplePopFront(in));
};
return ret;
}
@ -65,9 +65,9 @@ namespace Aurora::Async
Task_t TaskFromTupleCallableWithBindOwner2(Functor func)
{
Task_t ret;
ret.onFrame = [callable = func](const auto &in) -> ReturnValue_t
ret.onFrame = [=](const auto &in) -> ReturnValue_t
{
return AuTupleApply(callable, AuTuplePopFront(in));
return AuTupleApply(func, AuTuplePopFront(in));
};
return ret;
}

View File

@ -10,7 +10,7 @@
namespace Aurora::IO
{
AUKN_INTERFACE(IIOEventListener,
AUI_METHOD(void, Tick_RunOnTick, ()), // called on each tick, if registered by the iWaitableitem interface
AUI_METHOD(void, Tick_RunOnTick, ()), // called on each tick, if registered by the IWaitableItem interface
AUI_METHOD(void, Tick_OtherIOEvent, ()), // called if a different object woke us up
AUI_METHOD(void, Tick_SelfIOEvent, ()), // called if a ticker, io loop source, or other self-iWaitableitem interface poked us

View File

@ -25,6 +25,8 @@ namespace Aurora::IO
virtual AuUInt64 GetOwnedThreadId () = 0;
virtual AuSPtr<IIOProcessorItem> StartIOWatch (const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOEventListener> &listener) = 0;
virtual AuSPtr<IIOProcessorItem> StartSimpleIOWatch (const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOSimpleEventListener> &listener) = 0;
virtual AuSPtr<IIOProcessorItem> StartSimpleLSWatch (const AuSPtr<Loop::ILoopSource> &source, const AuSPtr<IIOSimpleEventListener> &listener) = 0;
virtual bool SubmitIOWorkItem (const AuSPtr<IIOProcessorWorkUnit> &work) = 0;

View File

@ -0,0 +1,19 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IIOSimpleEventListener.hpp
Date: 2022-6-22
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
AUKN_INTERFACE(IIOSimpleEventListener,
AUI_METHOD(void, OnIOTick, ()),
AUI_METHOD(void, OnIOFailure, ()),
AUI_METHOD(void, OnIOComplete, ())
);
AUKN_SYM AuSPtr<IIOEventListener> DesimplifyIOEventListenerAdapater(const AuSPtr<IIOSimpleEventListener> &interface);
}

View File

@ -13,6 +13,7 @@
#include "IIOWaitableItem.hpp"
#include "IIOEventListener.hpp"
#include "IIOSimpleEventListener.hpp"
#include "EIOProcessorEventStage.hpp"
#include "IIOPipeEventListener.hpp"

View File

@ -59,6 +59,10 @@ namespace Aurora::Memory
{
linearOverhead = readPtr - writePtr;
}
else if (writePtr == readPtr)
{
linearOverhead = 0;
}
else
{
linearOverhead = length - (writePtr - base);
@ -134,7 +138,11 @@ namespace Aurora::Memory
{
if (flagCircular)
{
if ((readPtr < writePtr) && (endAtWrite))
if (readPtr == writePtr)
{
return 0;
}
else if ((readPtr < writePtr) && (endAtWrite))
{
return writePtr - readPtr;
}
@ -169,7 +177,11 @@ namespace Aurora::Memory
{
if (flagCircular)
{
if ((writePtr < readPtr) && (endAtRead))
if (readPtr == writePtr)
{
return length;
}
else if ((writePtr < readPtr) && (endAtRead))
{
return readPtr - writePtr;
}

View File

@ -113,7 +113,12 @@ namespace Aurora::Memory
AuUInt linearOverhead = 0, toWriteOverhead = 0, linearReadable = 0, toWriteReadable = 0;
if (flagCircular)
{
if (readPtr < writePtr)
if (readPtr == writePtr)
{
linearOverhead = 0;
toWriteOverhead = 0;
}
else if (readPtr < writePtr)
{
linearOverhead = writePtr - readPtr;
toWriteOverhead = 0;
@ -131,12 +136,12 @@ namespace Aurora::Memory
if (out)
{
AuMemcpy(out, readPtr, linearOverhead);
AuMemcpy(out, readPtr, linearReadable);
}
if (!peek)
{
readPtr += linearOverhead;
readPtr += linearReadable;
}
if (toWriteOverhead)

View File

@ -252,7 +252,7 @@ namespace Aurora::Async
bool success {};
auto runMode = GetCurrentThreadRunMode();
do
//do
{
auto asyncLoop = state->asyncLoop;
@ -304,11 +304,12 @@ namespace Aurora::Async
}
if (bShouldTrySleepForKernel
// epoll and such like can be checked without read success. kevent works on availablity, not scheduling read like iosubmit, too.
// epoll and such like can be checked without read success. kevent works on availablity, not scheduling read like iosubmit
// allow windows to atomically pump instead of wasting time buffering the primitives state
#if defined(AURORA_PLATFORM_WIN32)
&& asyncLoop->WaitAny(0)
#endif
&& ((AuBuild::kIsNtDerived && runMode == ERunMode::eEfficient) ||
(!AuBuild::kIsNtDerived))
&& (asyncLoop->WaitAny(0))
)
{
PollInternal(block);
@ -323,7 +324,7 @@ namespace Aurora::Async
{
success = PollInternal(block);
}
} while (success);
} //while (success);
return success;
}
@ -946,6 +947,7 @@ namespace Aurora::Async
threadState->id = workerId;
threadState->asyncLoop = AuMakeShared<AsyncLoop>();
threadState->rateLimiter.SetNextStep(1'000'000); // 1MS in nanoseconds
threadState->runMode = ERunMode::eEfficient;
if (!threadState->asyncLoop)
{

View File

@ -384,6 +384,7 @@ namespace Aurora::Async
{
if (!task)
{
SysPushErrorArg("WorkItem has null task. Running out of memory?");
return {};
}
@ -394,11 +395,13 @@ namespace Aurora::Async
{
if (!task)
{
SysPushErrorArg("WorkItem has null task. Running out of memory?");
return {};
}
if (!worker.pool)
{
SysPushErrorArg("WorkItem has null pool");
return {};
}

View File

@ -90,7 +90,7 @@ namespace Aurora::Console::Commands
if (type == EDispatchType::eSys)
{
gPendingCommands.push_back(CommandDispatch(res.result, cmdEntry.callback));
return AuTryInsert(gPendingCommands, CommandDispatch(res.result, cmdEntry.callback));
}
else
{
@ -122,12 +122,13 @@ namespace Aurora::Console::Commands
AUKN_SYM void AddCommand(const AuString &tag, const Parse::ParseObject &commandStructure, const AuSPtr<ICommandSubscriber> &callback)
{
AU_LOCK_GUARD(gPendingCommandsMutex);
SysAssert(callback);
gCommands.insert(AuMakePair(tag, Command(tag, commandStructure, callback)));
}
AUKN_SYM bool DispatchCommand(const AuString &string)
{
return Dispatch(string, EDispatchType::eSys, {});
return Dispatch(string, EDispatchType::eSys, gCommandDispatcher);
}
AUKN_SYM bool DispatchCommandThisThread(const AuString &string)

View File

@ -47,7 +47,7 @@ namespace Aurora::Console
AUKN_SYM void ConsoleMessage::Write(Memory::ByteBuffer &serialize) const
{
serialize.Write(static_cast<EAnsiColor>(color));
serialize.Write(static_cast<AuUInt8>(color));
serialize.Write(prefix);
serialize.Write(line);
serialize.Write(time);

View File

@ -717,7 +717,7 @@ namespace Aurora::Console::ConsoleTTY
int XOffset = GetLeftBorder() + this->leftLogPadding;
auto itr = str.npos;
itr = str.find('\t', itr);
itr = str.find('\t');
while (itr != str.npos)
{

View File

@ -274,6 +274,7 @@ namespace Aurora::IO
if (code != EStreamError::eErrorNone)
{
parent->transaction->Reset();
return code;
}
}
@ -305,6 +306,11 @@ namespace Aurora::IO
{
parent->readOffset += out.length;
}
if (parent->lastAllocation->streamIndex == length)
{
parent->transaction->Reset();
}
}
return EStreamError::eErrorNone;

View File

@ -407,9 +407,9 @@ namespace Aurora::IO
this->bShouldReadNext = true;
}
if (this->output.forwardStream.bFlushWriter)
if (this->output.type == EPipeCallbackType::eWriteToWriter)
{
if (this->output.type == EPipeCallbackType::eWriteToWriter)
if (this->output.forwardStream.bFlushWriter)
{
this->output.forwardStream.writer->Flush();
}

View File

@ -690,6 +690,30 @@ namespace Aurora::IO
return item;
}
AuSPtr<IIOProcessorItem> IOProcessor::StartSimpleIOWatch(const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOSimpleEventListener> &listener)
{
auto adapater = DesimplifyIOEventListenerAdapater(listener);
if (!adapater)
{
SysPushErrorMem();
return {};
}
return this->StartIOWatch(object, adapater);
}
AuSPtr<IIOProcessorItem> IOProcessor::StartSimpleLSWatch(const AuSPtr<Loop::ILoopSource> &source, const AuSPtr<IIOSimpleEventListener> &listener)
{
auto sourceAdapter = NewWaitableLoopSource(source);
if (!sourceAdapter)
{
SysPushErrorMem();
return {};
}
return this->StartSimpleIOWatch(sourceAdapter, listener);
}
AuSPtr<IIOPipeProcessor> IOProcessor::ToPipeProcessor()
{
if (!CheckThread())
@ -716,6 +740,11 @@ namespace Aurora::IO
queue->SourceRemove(this->items.cvEvent);
}
for (auto &io : this->items.registeredIO)
{
queue->SourceRemove(io->item->GetSelfIOSource());
}
queue->Commit();
}
}

View File

@ -64,6 +64,8 @@ namespace Aurora::IO
bool MultiplexRefreshRateWithIOEvents() override;
AuSPtr<IIOProcessorItem> StartIOWatch(const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOEventListener> &listener) override;
AuSPtr<IIOProcessorItem> StartSimpleIOWatch(const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOSimpleEventListener> &listener) override;
AuSPtr<IIOProcessorItem> StartSimpleLSWatch(const AuSPtr<Loop::ILoopSource> &source, const AuSPtr<IIOSimpleEventListener> &listener) override;
AuSPtr<IIOPipeProcessor> ToPipeProcessor() override;

View File

@ -0,0 +1,63 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOSimpleEventListener.cpp
Date: 2022-6-22
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "IOSimpleEventListener.hpp"
namespace Aurora::IO
{
IOSimpleEventListener::IOSimpleEventListener(const AuSPtr<IIOSimpleEventListener> &parent) : parent(parent)
{
}
void IOSimpleEventListener::Tick_RunOnTick()
{
}
void IOSimpleEventListener::Tick_OtherIOEvent()
{
}
void IOSimpleEventListener::Tick_SelfIOEvent()
{
}
void IOSimpleEventListener::Tick_Any()
{
this->parent->OnIOTick();
}
void IOSimpleEventListener::Tick_FrameEpilogue()
{
}
void IOSimpleEventListener::OnFailureCompletion()
{
this->parent->OnIOFailure();
}
void IOSimpleEventListener::OnNominalCompletion()
{
this->parent->OnIOComplete();
}
AuSPtr<IIOEventListener> DesimplifyIOEventListenerAdapater(const AuSPtr<IIOSimpleEventListener> &interface)
{
if (!interface)
{
return {};
}
return AuMakeShared<IOSimpleEventListener>(interface);
}
}

View File

@ -0,0 +1,26 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOSimpleEventListener.hpp
Date: 2022-6-22
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IOSimpleEventListener : IIOEventListener
{
IOSimpleEventListener(const AuSPtr<IIOSimpleEventListener> &parent);
void Tick_RunOnTick() override;
void Tick_OtherIOEvent() override;
void Tick_SelfIOEvent() override;
void Tick_Any() override;
void Tick_FrameEpilogue() override;
void OnFailureCompletion() override;
void OnNominalCompletion() override;
AuSPtr<IIOSimpleEventListener> parent;
};
}

View File

@ -22,7 +22,7 @@ namespace Aurora::IO::IPC
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
struct IPCPipeImpl;
struct IPCHasConnectionEvent : Loop::LSEvent
struct IPCHasConnectionEvent : Loop::LSHandle
{
IPCHasConnectionEvent(AuSPtr<IPCPipeImpl> parent);
@ -56,22 +56,26 @@ namespace Aurora::IO::IPC
Loop::ELoopSource GetType() override;
HANDLE GetPipeHandle();
HANDLE GetConnectHandle()
{
return (HANDLE)AuStaticCast<Loop::LSEvent>(this->hasClient_)->GetHandle();
}
void TryConnect();
OVERLAPPED overlapped {};
AuSPtr<Loop::ILSEvent> hasClient_;
private:
HANDLE serverHandle_ {INVALID_HANDLE_VALUE};
HANDLE clientHandle_ {INVALID_HANDLE_VALUE};
IPCHandle ipcHandle_;
AuSPtr<IO::FS::FileHandle> fsHandle_;
AuSPtr<IO::FS::NtAsyncFileStream> fsStream_;
AuSPtr<Loop::ILSEvent> hasClient_;
AuSPtr<IPCHasConnectionEvent> lshasConnection_;
bool bFirstTime {true};
};
IPCHasConnectionEvent::IPCHasConnectionEvent(AuSPtr<IPCPipeImpl> parent) : parent_(parent), LSEvent(false, false, true)
IPCHasConnectionEvent::IPCHasConnectionEvent(AuSPtr<IPCPipeImpl> parent) : parent_(parent), LSHandle((AuUInt)parent->GetConnectHandle())
{
}
@ -126,7 +130,7 @@ namespace Aurora::IO::IPC
return;
}
this->overlapped.hEvent = (HANDLE)AuStaticCast<Loop::LSEvent>(this->hasClient_)->GetHandle();
this->overlapped.hEvent = GetConnectHandle();
bool firstTime = AuExchange(bFirstTime, false);
if (firstTime ||
@ -175,7 +179,7 @@ namespace Aurora::IO::IPC
this->lshasConnection_ = AuMakeShared<IPCHasConnectionEvent>(AuSharedFromThis());
}
return AuStaticCast<Loop::ILSEvent>(this->lshasConnection_);
return this->lshasConnection_;
}
AuSPtr<Loop::ILoopSource> IPCPipeImpl::AsReadChannelHasData()