From 3766ea8b86e2e6814cb7b305f12c4bfed933f30f Mon Sep 17 00:00:00 2001 From: Jamie Reece Wilson Date: Sat, 6 Jan 2024 04:18:13 +0000 Subject: [PATCH] [+] IAsyncTransaction::GetCompletionGroup [*] IO improvements --- Include/Aurora/IO/IAsyncTransaction.hpp | 5 + Include/Aurora/IO/IIOWaitableTickLimiter.hpp | 20 ---- Include/Aurora/IO/IOExperimental.hpp | 7 +- .../Aurora/IO/IOWaitableIOCompletionGroup.hpp | 14 +++ Source/IO/Adapters/AuIOAdapterAsyncStream.cpp | 20 +++- Source/IO/AuIOProcessor.cpp | 8 +- Source/IO/AuIOProcessorItem.cpp | 6 + Source/IO/AuIOProcessorItem.hpp | 2 + Source/IO/AuIOProcessorItems.cpp | 15 ++- Source/IO/AuIOWaitableIOCompletionGroup.cpp | 112 ++++++++++++++++++ Source/IO/AuIOWaitableIOCompletionGroup.hpp | 39 ++++++ Source/IO/CompletionGroup/CompletionGroup.cpp | 50 ++++++++ Source/IO/CompletionGroup/CompletionGroup.hpp | 3 + Source/IO/FS/Async.NT.cpp | 5 + Source/IO/FS/Async.NT.hpp | 1 + Source/IO/Net/AuNetStream.Linux.cpp | 7 +- Source/IO/Net/AuNetStream.Linux.hpp | 1 + Source/IO/Net/AuNetStream.NT.cpp | 5 + Source/IO/Net/AuNetStream.NT.hpp | 1 + 19 files changed, 288 insertions(+), 33 deletions(-) delete mode 100644 Include/Aurora/IO/IIOWaitableTickLimiter.hpp create mode 100644 Include/Aurora/IO/IOWaitableIOCompletionGroup.hpp create mode 100644 Source/IO/AuIOWaitableIOCompletionGroup.cpp create mode 100644 Source/IO/AuIOWaitableIOCompletionGroup.hpp diff --git a/Include/Aurora/IO/IAsyncTransaction.hpp b/Include/Aurora/IO/IAsyncTransaction.hpp index 2c91ec4a..41268a79 100644 --- a/Include/Aurora/IO/IAsyncTransaction.hpp +++ b/Include/Aurora/IO/IAsyncTransaction.hpp @@ -97,6 +97,11 @@ namespace Aurora::IO return nullptr; } + inline virtual AuSPtr GetCompletionGroup() + { + return nullptr; + } + AURT_ADD_USR_DATA; }; } \ No newline at end of file diff --git a/Include/Aurora/IO/IIOWaitableTickLimiter.hpp b/Include/Aurora/IO/IIOWaitableTickLimiter.hpp deleted file mode 100644 index 510be4ea..00000000 --- a/Include/Aurora/IO/IIOWaitableTickLimiter.hpp +++ /dev/null @@ -1,20 +0,0 @@ -/*** - Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. - - File: IIOWaitableTickLimiter.hpp - Date: 2022-6-6 - Author: Reece -***/ -#pragma once - -namespace Aurora::IO -{ - struct IIOWaitableTickLimiter : IIOWaitableItem - { - virtual AuUInt64 SetConstantTick(AuUInt64 ns) = 0; - virtual AuUInt64 SetMinTime(AuUInt64 ns) = 0; - virtual AuUInt32 SetMinTickDelta(AuUInt32 tickDelta) = 0; - }; - - AUKN_SYM AuSPtr NewWaitableTickLimiter(); -} \ No newline at end of file diff --git a/Include/Aurora/IO/IOExperimental.hpp b/Include/Aurora/IO/IOExperimental.hpp index c8af72d8..232789cc 100644 --- a/Include/Aurora/IO/IOExperimental.hpp +++ b/Include/Aurora/IO/IOExperimental.hpp @@ -55,10 +55,11 @@ #include "Adapters/IOAdapterNOPs.hpp" #include "Adapters/IOAdapterZeros.hpp" -#include "IIOWaitableTickLimiter.hpp" -#include "IIOWaitableIOTimer.hpp" +#include "IIOWaitableIOTimer.hpp" #include "IIOWaitableIOLoopSource.hpp" #include "IOPipeInterceptorNop.hpp" -#include "CompletionGroup/CompletionGroup.hpp" \ No newline at end of file +#include "CompletionGroup/CompletionGroup.hpp" + +#include "IOWaitableIOCompletionGroup.hpp" \ No newline at end of file diff --git a/Include/Aurora/IO/IOWaitableIOCompletionGroup.hpp b/Include/Aurora/IO/IOWaitableIOCompletionGroup.hpp new file mode 100644 index 00000000..caade66b --- /dev/null +++ b/Include/Aurora/IO/IOWaitableIOCompletionGroup.hpp @@ -0,0 +1,14 @@ +/*** + Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: IOWaitableIOCompletionGroup.hpp + Date: 2024-1-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + AUKN_SYM AuSPtr NewWaitableItemForSleepingCompletionGroup(const AuSPtr &pGroup, + bool bAny = { true }); +} \ No newline at end of file diff --git a/Source/IO/Adapters/AuIOAdapterAsyncStream.cpp b/Source/IO/Adapters/AuIOAdapterAsyncStream.cpp index 6c4e1d13..421d23cf 100644 --- a/Source/IO/Adapters/AuIOAdapterAsyncStream.cpp +++ b/Source/IO/Adapters/AuIOAdapterAsyncStream.cpp @@ -9,6 +9,7 @@ #include #include "AuIOAdapterAsyncStream.hpp" #include "../AuIOWaitableIOLoopSource.hpp" +#include "../AuIOWaitableIOCompletionGroup.hpp" namespace Aurora::IO::Adapters { @@ -97,6 +98,7 @@ namespace Aurora::IO::Adapters int locked {}; IOWatachableIOLoopSource source; + IOWaitableIOCompletionGroup source2; // impl AsyncStreamReader reader; @@ -621,14 +623,22 @@ namespace Aurora::IO::Adapters AuSPtr AsyncStreamAdapter::ToWaitable() { - auto pLoopSource = this->transaction->NewLoopSource(); - if (!pLoopSource) + if (auto pGroup = this->transaction->GetCompletionGroup()) { - return {}; + this->source2.SetGroup(pGroup); + return AuSPtr(AuSharedFromThis(), &this->source2); } + else + { + auto pLoopSource = this->transaction->NewLoopSource(); + if (!pLoopSource) + { + return {}; + } - this->source.SetLoopSource(pLoopSource); - return AuSPtr(AuSharedFromThis(), &this->source); + this->source.SetLoopSource(pLoopSource); + return AuSPtr(AuSharedFromThis(), &this->source); + } } void AsyncStreamAdapter::ReserveBuffer(AuUInt64 uLength) diff --git a/Source/IO/AuIOProcessor.cpp b/Source/IO/AuIOProcessor.cpp index c25876b6..06cd905c 100644 --- a/Source/IO/AuIOProcessor.cpp +++ b/Source/IO/AuIOProcessor.cpp @@ -396,7 +396,6 @@ namespace Aurora::IO } } - void IOProcessor::ClearProcessor(const AuSPtr &processor, bool fatal) { if (!AuTryRemove(this->items.allItems, processor)) @@ -460,6 +459,8 @@ namespace Aurora::IO SysPushErrorNested("IO Remove Error [!!!]"); } } + + processor->Finalize(); } AuUInt IOProcessor::FrameFinalize() @@ -810,6 +811,11 @@ namespace Aurora::IO } } + if (pItem->CanRequestTick()) + { + pItem->OnReportPumper(item); + } + return item; } diff --git a/Source/IO/AuIOProcessorItem.cpp b/Source/IO/AuIOProcessorItem.cpp index 23e97ba7..139cdbad 100644 --- a/Source/IO/AuIOProcessorItem.cpp +++ b/Source/IO/AuIOProcessorItem.cpp @@ -125,4 +125,10 @@ namespace Aurora::IO StopWatch(); } } + + void IOProcessorItem::Finalize() + { + AuResetMember(this->pItem); + AuResetMember(this->pListener); + } } \ No newline at end of file diff --git a/Source/IO/AuIOProcessorItem.hpp b/Source/IO/AuIOProcessorItem.hpp index 87eb811b..e105bdd4 100644 --- a/Source/IO/AuIOProcessorItem.hpp +++ b/Source/IO/AuIOProcessorItem.hpp @@ -33,5 +33,7 @@ namespace Aurora::IO // void IOAlert(bool force); + + void Finalize(); }; } \ No newline at end of file diff --git a/Source/IO/AuIOProcessorItems.cpp b/Source/IO/AuIOProcessorItems.cpp index 311b5b69..cafe08c4 100644 --- a/Source/IO/AuIOProcessorItems.cpp +++ b/Source/IO/AuIOProcessorItems.cpp @@ -41,8 +41,12 @@ namespace Aurora::IO if (!AddFrameTemp(item)) { AU_LOCK_GUARD(this->mutex2); - this->cvEvent->Set(); - return AuTryInsert(this->workSignaled2, item); + bool bInsert = AuTryInsert(this->workSignaled2, item); + if (this->cvEvent) + { + this->cvEvent->Set(); + } + return bInsert; } return true; @@ -51,7 +55,12 @@ namespace Aurora::IO bool IOProcessorItems::ScheduleFinish(const AuSPtr &item, bool unsafe) { AU_TRY_LOCK_GUARD_RET_DEF(this->mutex); - return AuTryInsert(this->crossThreadAbort, AuMakePair(item, unsafe)); + auto bRet = AuTryInsert(this->crossThreadAbort, AuMakePair(item, unsafe)); + if (this->cvEvent) + { + this->cvEvent->Set(); + } + return bRet; } AuList> IOProcessorItems::GetBlockedSignals() diff --git a/Source/IO/AuIOWaitableIOCompletionGroup.cpp b/Source/IO/AuIOWaitableIOCompletionGroup.cpp new file mode 100644 index 00000000..5a423a92 --- /dev/null +++ b/Source/IO/AuIOWaitableIOCompletionGroup.cpp @@ -0,0 +1,112 @@ +/*** + Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuIOWaitableIOCompletionGroup.cpp + Date: 2024-1-6 + Author: Reece +***/ +#include +#include +#include "AuIOWaitableIOCompletionGroup.hpp" +#include + +namespace Aurora::IO +{ + IOWaitableIOCompletionGroup::IOWaitableIOCompletionGroup(const AuSPtr &pGroup, + bool bAny) : + pGroup(pGroup), + bAny(bAny) + { + + } + + IOWaitableIOCompletionGroup::IOWaitableIOCompletionGroup() : + bAny { true } + { + + } + + void IOWaitableIOCompletionGroup::SetGroup(const AuSPtr &pGroup) + { + this->pGroup = pGroup; + + if (this->pParent) + { + this->Bind(this->pGroup, this->pParent); + } + else + { + // wait + } + } + + bool IOWaitableIOCompletionGroup::IsRunOnOtherTick() + { + return {}; + } + + bool IOWaitableIOCompletionGroup::IsRunOnTick() + { + return {}; + } + + bool IOWaitableIOCompletionGroup::CanRequestTick() + { + return true; + } + + void IOWaitableIOCompletionGroup::OnReportPumper(const AuSPtr &iface) + { + this->pParent = iface; + + if (this->pGroup) + { + this->Bind(this->pGroup, this->pParent); + } + } + + void IOWaitableIOCompletionGroup::Bind(const AuSPtr &pGroup, + const AuSPtr &pParent) + { + AuStaticCast(pGroup)->AddCallbackTick(pParent, this->bAny); + AuResetMember(this->pGroup); + } + + bool IOWaitableIOCompletionGroup::IsRunOnSelfIO() + { + return false; + } + + AuSPtr IOWaitableIOCompletionGroup::GetSelfIOSource() + { + return {}; + } + + bool IOWaitableIOCompletionGroup::ApplyRateLimit() + { + return {}; + } + + bool IOWaitableIOCompletionGroup::IsRunOnSelfIOCheckedOnTimerTick() + { + return true; + } + + AuUInt32 IOWaitableIOCompletionGroup::IOTimeoutInMS() + { + return 0; + } + + AUKN_SYM AuSPtr NewWaitableItemForSleepingCompletionGroup(const AuSPtr &pGroup, + bool bAny) + { + auto pThat = AuMakeShared(pGroup, bAny); + if (!pThat) + { + SysPushErrorMemory(); + return {}; + } + + return pThat; + } +} \ No newline at end of file diff --git a/Source/IO/AuIOWaitableIOCompletionGroup.hpp b/Source/IO/AuIOWaitableIOCompletionGroup.hpp new file mode 100644 index 00000000..9adc2e07 --- /dev/null +++ b/Source/IO/AuIOWaitableIOCompletionGroup.hpp @@ -0,0 +1,39 @@ +/*** + Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: AuIOWaitableIOCompletionGroup.hpp + Date: 2024-1-6 + Author: Reece +***/ +#pragma once + +namespace Aurora::IO +{ + struct IOWaitableIOCompletionGroup : IIOWaitableItem + { + IOWaitableIOCompletionGroup(const AuSPtr &pGroup, bool bAny = true); + IOWaitableIOCompletionGroup(); + + void SetGroup(const AuSPtr &pGroup); + void Bind(const AuSPtr &pGroup, + const AuSPtr &pParent); + + bool IsRunOnOtherTick() override; + bool IsRunOnTick() override; + + bool CanRequestTick() override; + void OnReportPumper(const AuSPtr &iface) override; + + bool IsRunOnSelfIO() override; + AuSPtr GetSelfIOSource() override; + + bool ApplyRateLimit() override; + + AuUInt32 IOTimeoutInMS() override; + bool IsRunOnSelfIOCheckedOnTimerTick() override; + + AuSPtr pGroup; + AuSPtr pParent {}; + bool bAny { true }; + }; +} \ No newline at end of file diff --git a/Source/IO/CompletionGroup/CompletionGroup.cpp b/Source/IO/CompletionGroup/CompletionGroup.cpp index cd43e96f..ca9fbeac 100644 --- a/Source/IO/CompletionGroup/CompletionGroup.cpp +++ b/Source/IO/CompletionGroup/CompletionGroup.cpp @@ -53,6 +53,8 @@ namespace Aurora::IO::CompletionGroup { pOld->CleanupForGCWI(); } + + AuResetMember(this->callbackTicks); } bool CompletionGroup::HasItemsActive() @@ -108,6 +110,27 @@ namespace Aurora::IO::CompletionGroup } } + { + AU_LOCK_GUARD(this->mutex); + + for (const auto &[pCallback, bAny] : this->callbackTicks) + { + if (!bAny) + { + continue; + } + + try + { + pCallback->InvokeManualTick(); + } + catch (...) + { + SysPushErrorCatch(); + } + } + } + if (this->workItems.empty()) { this->andPlsDontAllocateFdIfUntouchedEvent.Set(); @@ -124,6 +147,27 @@ namespace Aurora::IO::CompletionGroup } } + { + AU_LOCK_GUARD(this->mutex); + + for (const auto &[pCallback, bAny] : this->callbackTicks) + { + if (bAny) + { + continue; + } + + try + { + pCallback->InvokeManualTick(); + } + catch (...) + { + SysPushErrorCatch(); + } + } + } + // anyone else? this->ResetMemoryPins(); } @@ -136,6 +180,12 @@ namespace Aurora::IO::CompletionGroup this->uAdded++; } + void CompletionGroup::AddCallbackTick(const AuSPtr &pCallbackInvoker, bool bAny) + { + AU_LOCK_GUARD(this->mutex); + this->callbackTicks.push_back(AuMakePair(pCallbackInvoker, bAny)); + } + AuSPtr CompletionGroup::ToAndLoopSource() { return this->andPlsDontAllocateFdIfUntouchedEvent.GetLoopSource(); diff --git a/Source/IO/CompletionGroup/CompletionGroup.hpp b/Source/IO/CompletionGroup/CompletionGroup.hpp index aa91fab4..7e158421 100644 --- a/Source/IO/CompletionGroup/CompletionGroup.hpp +++ b/Source/IO/CompletionGroup/CompletionGroup.hpp @@ -41,6 +41,8 @@ namespace Aurora::IO::CompletionGroup void AddWorkItem(AuSPtr pCompletable) override; + void AddCallbackTick(const AuSPtr &pCallbackInvoker, bool bAny); + private: AuMutex mutex; AuCriticalSection cs; @@ -50,6 +52,7 @@ namespace Aurora::IO::CompletionGroup AuSPtr pCallbacks; AuSPtr pAnyBarrier; AuSPtr pAndBarrier; + AuList, bool>> callbackTicks; AuUInt32 uAdded {}; AuUInt32 uTriggered {}; bool bNoAny {}; diff --git a/Source/IO/FS/Async.NT.cpp b/Source/IO/FS/Async.NT.cpp index 2c8467e5..d844dc88 100644 --- a/Source/IO/FS/Async.NT.cpp +++ b/Source/IO/FS/Async.NT.cpp @@ -498,6 +498,11 @@ namespace Aurora::IO::FS return this; } + AuSPtr NtAsyncFileTransaction::GetCompletionGroup() + { + return this->pCompletionGroup_; + } + bool NtAsyncFileTransaction::HasFailed() { return this->bHasFailed && diff --git a/Source/IO/FS/Async.NT.hpp b/Source/IO/FS/Async.NT.hpp index 41947ddd..4e3e8358 100644 --- a/Source/IO/FS/Async.NT.hpp +++ b/Source/IO/FS/Async.NT.hpp @@ -71,6 +71,7 @@ namespace Aurora::IO::FS bool TryAttachToCompletionGroup(const AuSPtr &pCompletionGroup) override; CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle() override; + AuSPtr GetCompletionGroup() override; bool IDontWannaUsePorts(); diff --git a/Source/IO/Net/AuNetStream.Linux.cpp b/Source/IO/Net/AuNetStream.Linux.cpp index 844d154a..a9c1e507 100644 --- a/Source/IO/Net/AuNetStream.Linux.cpp +++ b/Source/IO/Net/AuNetStream.Linux.cpp @@ -259,11 +259,16 @@ namespace Aurora::IO::Net return true; } - CompletionGroup::LinuxAsyncNetworkTransaction *NtAsyncNetworkTransaction::ToCompletionGroupHandle() + CompletionGroup::ICompletionGroupWorkHandle *LinuxAsyncNetworkTransaction::ToCompletionGroupHandle() { return this; } + AuSPtr LinuxAsyncNetworkTransaction::GetCompletionGroup() + { + return this->pCompletionGroup_; + } + bool LinuxAsyncNetworkTransaction::Complete() { return this->bLatch; diff --git a/Source/IO/Net/AuNetStream.Linux.hpp b/Source/IO/Net/AuNetStream.Linux.hpp index ee9689ff..0ae35a48 100644 --- a/Source/IO/Net/AuNetStream.Linux.hpp +++ b/Source/IO/Net/AuNetStream.Linux.hpp @@ -65,6 +65,7 @@ namespace Aurora::IO::Net bool TryAttachToCompletionGroup(const AuSPtr &pCompletionGroup) override; CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle() override; + AuSPtr GetCompletionGroup() override; SocketBase * pSocket; AuSPtr pWaitable; diff --git a/Source/IO/Net/AuNetStream.NT.cpp b/Source/IO/Net/AuNetStream.NT.cpp index 15331fb9..de721f3a 100644 --- a/Source/IO/Net/AuNetStream.NT.cpp +++ b/Source/IO/Net/AuNetStream.NT.cpp @@ -346,6 +346,11 @@ namespace Aurora::IO::Net return this; } + AuSPtr NtAsyncNetworkTransaction::GetCompletionGroup() + { + return this->pCompletionGroup_; + } + bool NtAsyncNetworkTransaction::Complete() { return CompleteEx(0); diff --git a/Source/IO/Net/AuNetStream.NT.hpp b/Source/IO/Net/AuNetStream.NT.hpp index 5d1053f3..f7057b0d 100644 --- a/Source/IO/Net/AuNetStream.NT.hpp +++ b/Source/IO/Net/AuNetStream.NT.hpp @@ -65,6 +65,7 @@ namespace Aurora::IO::Net bool TryAttachToCompletionGroup(const AuSPtr &pCompletionGroup) override; CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle() override; + AuSPtr GetCompletionGroup() override; SocketBase * pSocket; OVERLAPPED overlap {};