[+] IAsyncTransaction::GetCompletionGroup

[*] IO improvements
This commit is contained in:
Reece Wilson 2024-01-06 04:18:13 +00:00
parent 4a4f4e9608
commit 3766ea8b86
19 changed files with 288 additions and 33 deletions

View File

@ -97,6 +97,11 @@ namespace Aurora::IO
return nullptr; return nullptr;
} }
inline virtual AuSPtr<CompletionGroup::ICompletionGroup> GetCompletionGroup()
{
return nullptr;
}
AURT_ADD_USR_DATA; AURT_ADD_USR_DATA;
}; };
} }

View File

@ -1,20 +0,0 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IIOWaitableTickLimiter.hpp
Date: 2022-6-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IIOWaitableTickLimiter : IIOWaitableItem
{
virtual AuUInt64 SetConstantTick(AuUInt64 ns) = 0;
virtual AuUInt64 SetMinTime(AuUInt64 ns) = 0;
virtual AuUInt32 SetMinTickDelta(AuUInt32 tickDelta) = 0;
};
AUKN_SYM AuSPtr<IIOWaitableTickLimiter> NewWaitableTickLimiter();
}

View File

@ -55,10 +55,11 @@
#include "Adapters/IOAdapterNOPs.hpp" #include "Adapters/IOAdapterNOPs.hpp"
#include "Adapters/IOAdapterZeros.hpp" #include "Adapters/IOAdapterZeros.hpp"
#include "IIOWaitableTickLimiter.hpp"
#include "IIOWaitableIOTimer.hpp" #include "IIOWaitableIOTimer.hpp"
#include "IIOWaitableIOLoopSource.hpp" #include "IIOWaitableIOLoopSource.hpp"
#include "IOPipeInterceptorNop.hpp" #include "IOPipeInterceptorNop.hpp"
#include "CompletionGroup/CompletionGroup.hpp" #include "CompletionGroup/CompletionGroup.hpp"
#include "IOWaitableIOCompletionGroup.hpp"

View File

@ -0,0 +1,14 @@
/***
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOWaitableIOCompletionGroup.hpp
Date: 2024-1-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
AUKN_SYM AuSPtr<IIOWaitableItem> NewWaitableItemForSleepingCompletionGroup(const AuSPtr<IO::CompletionGroup::ICompletionGroup> &pGroup,
bool bAny = { true });
}

View File

@ -9,6 +9,7 @@
#include <Aurora/IO/IOExperimental.hpp> #include <Aurora/IO/IOExperimental.hpp>
#include "AuIOAdapterAsyncStream.hpp" #include "AuIOAdapterAsyncStream.hpp"
#include "../AuIOWaitableIOLoopSource.hpp" #include "../AuIOWaitableIOLoopSource.hpp"
#include "../AuIOWaitableIOCompletionGroup.hpp"
namespace Aurora::IO::Adapters namespace Aurora::IO::Adapters
{ {
@ -97,6 +98,7 @@ namespace Aurora::IO::Adapters
int locked {}; int locked {};
IOWatachableIOLoopSource source; IOWatachableIOLoopSource source;
IOWaitableIOCompletionGroup source2;
// impl // impl
AsyncStreamReader reader; AsyncStreamReader reader;
@ -621,14 +623,22 @@ namespace Aurora::IO::Adapters
AuSPtr<IIOWaitableItem> AsyncStreamAdapter::ToWaitable() AuSPtr<IIOWaitableItem> AsyncStreamAdapter::ToWaitable()
{ {
auto pLoopSource = this->transaction->NewLoopSource(); if (auto pGroup = this->transaction->GetCompletionGroup())
if (!pLoopSource)
{ {
return {}; this->source2.SetGroup(pGroup);
return AuSPtr<IIOWaitableItem>(AuSharedFromThis(), &this->source2);
} }
else
{
auto pLoopSource = this->transaction->NewLoopSource();
if (!pLoopSource)
{
return {};
}
this->source.SetLoopSource(pLoopSource); this->source.SetLoopSource(pLoopSource);
return AuSPtr<IIOWaitableItem>(AuSharedFromThis(), &this->source); return AuSPtr<IIOWaitableItem>(AuSharedFromThis(), &this->source);
}
} }
void AsyncStreamAdapter::ReserveBuffer(AuUInt64 uLength) void AsyncStreamAdapter::ReserveBuffer(AuUInt64 uLength)

View File

@ -396,7 +396,6 @@ namespace Aurora::IO
} }
} }
void IOProcessor::ClearProcessor(const AuSPtr<IOProcessorItem> &processor, bool fatal) void IOProcessor::ClearProcessor(const AuSPtr<IOProcessorItem> &processor, bool fatal)
{ {
if (!AuTryRemove(this->items.allItems, processor)) if (!AuTryRemove(this->items.allItems, processor))
@ -460,6 +459,8 @@ namespace Aurora::IO
SysPushErrorNested("IO Remove Error [!!!]"); SysPushErrorNested("IO Remove Error [!!!]");
} }
} }
processor->Finalize();
} }
AuUInt IOProcessor::FrameFinalize() AuUInt IOProcessor::FrameFinalize()
@ -810,6 +811,11 @@ namespace Aurora::IO
} }
} }
if (pItem->CanRequestTick())
{
pItem->OnReportPumper(item);
}
return item; return item;
} }

View File

@ -125,4 +125,10 @@ namespace Aurora::IO
StopWatch(); StopWatch();
} }
} }
void IOProcessorItem::Finalize()
{
AuResetMember(this->pItem);
AuResetMember(this->pListener);
}
} }

View File

@ -33,5 +33,7 @@ namespace Aurora::IO
// //
void IOAlert(bool force); void IOAlert(bool force);
void Finalize();
}; };
} }

View File

@ -41,8 +41,12 @@ namespace Aurora::IO
if (!AddFrameTemp(item)) if (!AddFrameTemp(item))
{ {
AU_LOCK_GUARD(this->mutex2); AU_LOCK_GUARD(this->mutex2);
this->cvEvent->Set(); bool bInsert = AuTryInsert(this->workSignaled2, item);
return AuTryInsert(this->workSignaled2, item); if (this->cvEvent)
{
this->cvEvent->Set();
}
return bInsert;
} }
return true; return true;
@ -51,7 +55,12 @@ namespace Aurora::IO
bool IOProcessorItems::ScheduleFinish(const AuSPtr<IOProcessorItem> &item, bool unsafe) bool IOProcessorItems::ScheduleFinish(const AuSPtr<IOProcessorItem> &item, bool unsafe)
{ {
AU_TRY_LOCK_GUARD_RET_DEF(this->mutex); AU_TRY_LOCK_GUARD_RET_DEF(this->mutex);
return AuTryInsert(this->crossThreadAbort, AuMakePair(item, unsafe)); auto bRet = AuTryInsert(this->crossThreadAbort, AuMakePair(item, unsafe));
if (this->cvEvent)
{
this->cvEvent->Set();
}
return bRet;
} }
AuList<AuSPtr<IOProcessorItem>> IOProcessorItems::GetBlockedSignals() AuList<AuSPtr<IOProcessorItem>> IOProcessorItems::GetBlockedSignals()

View File

@ -0,0 +1,112 @@
/***
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuIOWaitableIOCompletionGroup.cpp
Date: 2024-1-6
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "AuIOWaitableIOCompletionGroup.hpp"
#include <Source/IO/CompletionGroup/CompletionGroup.hpp>
namespace Aurora::IO
{
IOWaitableIOCompletionGroup::IOWaitableIOCompletionGroup(const AuSPtr<IO::CompletionGroup::ICompletionGroup> &pGroup,
bool bAny) :
pGroup(pGroup),
bAny(bAny)
{
}
IOWaitableIOCompletionGroup::IOWaitableIOCompletionGroup() :
bAny { true }
{
}
void IOWaitableIOCompletionGroup::SetGroup(const AuSPtr<IO::CompletionGroup::ICompletionGroup> &pGroup)
{
this->pGroup = pGroup;
if (this->pParent)
{
this->Bind(this->pGroup, this->pParent);
}
else
{
// wait
}
}
bool IOWaitableIOCompletionGroup::IsRunOnOtherTick()
{
return {};
}
bool IOWaitableIOCompletionGroup::IsRunOnTick()
{
return {};
}
bool IOWaitableIOCompletionGroup::CanRequestTick()
{
return true;
}
void IOWaitableIOCompletionGroup::OnReportPumper(const AuSPtr<IIOProcessorManualInvoker> &iface)
{
this->pParent = iface;
if (this->pGroup)
{
this->Bind(this->pGroup, this->pParent);
}
}
void IOWaitableIOCompletionGroup::Bind(const AuSPtr<IO::CompletionGroup::ICompletionGroup> &pGroup,
const AuSPtr<IIOProcessorManualInvoker> &pParent)
{
AuStaticCast<CompletionGroup::CompletionGroup>(pGroup)->AddCallbackTick(pParent, this->bAny);
AuResetMember(this->pGroup);
}
bool IOWaitableIOCompletionGroup::IsRunOnSelfIO()
{
return false;
}
AuSPtr<Loop::ILoopSource> IOWaitableIOCompletionGroup::GetSelfIOSource()
{
return {};
}
bool IOWaitableIOCompletionGroup::ApplyRateLimit()
{
return {};
}
bool IOWaitableIOCompletionGroup::IsRunOnSelfIOCheckedOnTimerTick()
{
return true;
}
AuUInt32 IOWaitableIOCompletionGroup::IOTimeoutInMS()
{
return 0;
}
AUKN_SYM AuSPtr<IIOWaitableItem> NewWaitableItemForSleepingCompletionGroup(const AuSPtr<IO::CompletionGroup::ICompletionGroup> &pGroup,
bool bAny)
{
auto pThat = AuMakeShared<IOWaitableIOCompletionGroup>(pGroup, bAny);
if (!pThat)
{
SysPushErrorMemory();
return {};
}
return pThat;
}
}

View File

@ -0,0 +1,39 @@
/***
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuIOWaitableIOCompletionGroup.hpp
Date: 2024-1-6
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
struct IOWaitableIOCompletionGroup : IIOWaitableItem
{
IOWaitableIOCompletionGroup(const AuSPtr<IO::CompletionGroup::ICompletionGroup> &pGroup, bool bAny = true);
IOWaitableIOCompletionGroup();
void SetGroup(const AuSPtr<IO::CompletionGroup::ICompletionGroup> &pGroup);
void Bind(const AuSPtr<IO::CompletionGroup::ICompletionGroup> &pGroup,
const AuSPtr<IIOProcessorManualInvoker> &pParent);
bool IsRunOnOtherTick() override;
bool IsRunOnTick() override;
bool CanRequestTick() override;
void OnReportPumper(const AuSPtr<IIOProcessorManualInvoker> &iface) override;
bool IsRunOnSelfIO() override;
AuSPtr<Loop::ILoopSource> GetSelfIOSource() override;
bool ApplyRateLimit() override;
AuUInt32 IOTimeoutInMS() override;
bool IsRunOnSelfIOCheckedOnTimerTick() override;
AuSPtr<IO::CompletionGroup::ICompletionGroup> pGroup;
AuSPtr<IIOProcessorManualInvoker> pParent {};
bool bAny { true };
};
}

View File

@ -53,6 +53,8 @@ namespace Aurora::IO::CompletionGroup
{ {
pOld->CleanupForGCWI(); pOld->CleanupForGCWI();
} }
AuResetMember(this->callbackTicks);
} }
bool CompletionGroup::HasItemsActive() bool CompletionGroup::HasItemsActive()
@ -108,6 +110,27 @@ namespace Aurora::IO::CompletionGroup
} }
} }
{
AU_LOCK_GUARD(this->mutex);
for (const auto &[pCallback, bAny] : this->callbackTicks)
{
if (!bAny)
{
continue;
}
try
{
pCallback->InvokeManualTick();
}
catch (...)
{
SysPushErrorCatch();
}
}
}
if (this->workItems.empty()) if (this->workItems.empty())
{ {
this->andPlsDontAllocateFdIfUntouchedEvent.Set(); this->andPlsDontAllocateFdIfUntouchedEvent.Set();
@ -124,6 +147,27 @@ namespace Aurora::IO::CompletionGroup
} }
} }
{
AU_LOCK_GUARD(this->mutex);
for (const auto &[pCallback, bAny] : this->callbackTicks)
{
if (bAny)
{
continue;
}
try
{
pCallback->InvokeManualTick();
}
catch (...)
{
SysPushErrorCatch();
}
}
}
// anyone else? // anyone else?
this->ResetMemoryPins(); this->ResetMemoryPins();
} }
@ -136,6 +180,12 @@ namespace Aurora::IO::CompletionGroup
this->uAdded++; this->uAdded++;
} }
void CompletionGroup::AddCallbackTick(const AuSPtr<IIOProcessorManualInvoker> &pCallbackInvoker, bool bAny)
{
AU_LOCK_GUARD(this->mutex);
this->callbackTicks.push_back(AuMakePair(pCallbackInvoker, bAny));
}
AuSPtr<Loop::ILoopSource> CompletionGroup::ToAndLoopSource() AuSPtr<Loop::ILoopSource> CompletionGroup::ToAndLoopSource()
{ {
return this->andPlsDontAllocateFdIfUntouchedEvent.GetLoopSource(); return this->andPlsDontAllocateFdIfUntouchedEvent.GetLoopSource();

View File

@ -41,6 +41,8 @@ namespace Aurora::IO::CompletionGroup
void AddWorkItem(AuSPtr<ICompletionGroupWorkItem> pCompletable) override; void AddWorkItem(AuSPtr<ICompletionGroupWorkItem> pCompletable) override;
void AddCallbackTick(const AuSPtr<IIOProcessorManualInvoker> &pCallbackInvoker, bool bAny);
private: private:
AuMutex mutex; AuMutex mutex;
AuCriticalSection cs; AuCriticalSection cs;
@ -50,6 +52,7 @@ namespace Aurora::IO::CompletionGroup
AuSPtr<ICompletionGroupHooks> pCallbacks; AuSPtr<ICompletionGroupHooks> pCallbacks;
AuSPtr<Async::IWorkItem> pAnyBarrier; AuSPtr<Async::IWorkItem> pAnyBarrier;
AuSPtr<Async::IWorkItem> pAndBarrier; AuSPtr<Async::IWorkItem> pAndBarrier;
AuList<AuPair<AuSPtr<IIOProcessorManualInvoker>, bool>> callbackTicks;
AuUInt32 uAdded {}; AuUInt32 uAdded {};
AuUInt32 uTriggered {}; AuUInt32 uTriggered {};
bool bNoAny {}; bool bNoAny {};

View File

@ -498,6 +498,11 @@ namespace Aurora::IO::FS
return this; return this;
} }
AuSPtr<CompletionGroup::ICompletionGroup> NtAsyncFileTransaction::GetCompletionGroup()
{
return this->pCompletionGroup_;
}
bool NtAsyncFileTransaction::HasFailed() bool NtAsyncFileTransaction::HasFailed()
{ {
return this->bHasFailed && return this->bHasFailed &&

View File

@ -71,6 +71,7 @@ namespace Aurora::IO::FS
bool TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup) override; bool TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup) override;
CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle() override; CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle() override;
AuSPtr<CompletionGroup::ICompletionGroup> GetCompletionGroup() override;
bool IDontWannaUsePorts(); bool IDontWannaUsePorts();

View File

@ -259,11 +259,16 @@ namespace Aurora::IO::Net
return true; return true;
} }
CompletionGroup::LinuxAsyncNetworkTransaction *NtAsyncNetworkTransaction::ToCompletionGroupHandle() CompletionGroup::ICompletionGroupWorkHandle *LinuxAsyncNetworkTransaction::ToCompletionGroupHandle()
{ {
return this; return this;
} }
AuSPtr<CompletionGroup::ICompletionGroup> LinuxAsyncNetworkTransaction::GetCompletionGroup()
{
return this->pCompletionGroup_;
}
bool LinuxAsyncNetworkTransaction::Complete() bool LinuxAsyncNetworkTransaction::Complete()
{ {
return this->bLatch; return this->bLatch;

View File

@ -65,6 +65,7 @@ namespace Aurora::IO::Net
bool TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup) override; bool TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup) override;
CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle() override; CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle() override;
AuSPtr<CompletionGroup::ICompletionGroup> GetCompletionGroup() override;
SocketBase * pSocket; SocketBase * pSocket;
AuSPtr<Loop::ILSEvent> pWaitable; AuSPtr<Loop::ILSEvent> pWaitable;

View File

@ -346,6 +346,11 @@ namespace Aurora::IO::Net
return this; return this;
} }
AuSPtr<CompletionGroup::ICompletionGroup> NtAsyncNetworkTransaction::GetCompletionGroup()
{
return this->pCompletionGroup_;
}
bool NtAsyncNetworkTransaction::Complete() bool NtAsyncNetworkTransaction::Complete()
{ {
return CompleteEx(0); return CompleteEx(0);

View File

@ -65,6 +65,7 @@ namespace Aurora::IO::Net
bool TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup) override; bool TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup) override;
CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle() override; CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle() override;
AuSPtr<CompletionGroup::ICompletionGroup> GetCompletionGroup() override;
SocketBase * pSocket; SocketBase * pSocket;
OVERLAPPED overlap {}; OVERLAPPED overlap {};