[+] IO completion groups

This commit is contained in:
Reece Wilson 2023-12-28 16:49:11 +00:00
parent be2b781ed6
commit 662dbac0c1
38 changed files with 914 additions and 142 deletions

View File

@ -0,0 +1,18 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: CompletionGroup.hpp
Date: 2023-12-28
Author: Reece
***/
#pragma once
namespace Aurora::IO::CompletionGroup
{
struct CompletionGroup;
}
#include "ICompletionGroupWorkHandle.hpp"
#include "ICompletionGroupWorkItem.hpp"
#include "ICompletionGroupHooks.hpp"
#include "ICompletionGroup.hpp"

View File

@ -0,0 +1,41 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: CompletionGroup.hpp
Date: 2023-12-28
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
struct IWorkItem;
}
namespace Aurora::IO::Loop
{
struct ILSEvent;
}
namespace Aurora::IO::CompletionGroup
{
struct ICompletionGroup
{
virtual AuSPtr<Loop::ILoopSource> ToAndLoopSource() = 0;
virtual AuSPtr<Loop::ILoopSource> ToAnyLoopSource() = 0;
virtual AuSPtr<Async::IWorkItem> OnCompletion() = 0;
virtual AuSPtr<Async::IWorkItem> OnSingleCompletion() = 0;
virtual bool HasCompleted() = 0;
virtual AuPair<AuUInt32, AuUInt32> GetStats() = 0;
virtual void SetCallbacks(const AuSPtr<ICompletionGroupHooks> &pCallbacks) = 0;
virtual void AddWorkItem(AuSPtr<ICompletionGroupWorkItem> pCompletable) = 0;
virtual AuSPtr<Loop::ILSEvent> GetTriggerLoopSource() = 0;
virtual void TryTrigger() = 0;
};
AUKN_SYM AuSPtr<ICompletionGroup> NewCompletionGroup();
}

View File

@ -0,0 +1,16 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ICompletionGroupHooks.hpp
Date: 2023-12-28
Author: Reece
***/
#pragma once
namespace Aurora::IO::CompletionGroup
{
AUI_INTERFACE(ICompletionGroupHooks,
AUI_METHOD(void, OnHandleComplete, (const AuSPtr<ICompletionGroupWorkItem>, pTriggered)),
AUI_METHOD(void, OnComplete, ())
)
}

View File

@ -0,0 +1,18 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ICompletionGroupWorkHandle.hpp
Date: 2023-12-28
Author: Reece
***/
#pragma once
namespace Aurora::IO::CompletionGroup
{
struct ICompletionGroupWorkHandle
{
// pls gimme a vtable of a single purecall stub
inline virtual ~ICompletionGroupWorkHandle()
{ };
};
}

View File

@ -0,0 +1,17 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ICompletionGroupWorkItem.hpp
Date: 2023-12-28
Author: Reece
***/
#pragma once
namespace Aurora::IO::CompletionGroup
{
struct ICompletionGroupWorkItem : ICompletionGroupWorkHandle
{
virtual bool HasCompletedForGCWI() = 0;
virtual void CleanupForGCWI() = 0;
};
}

View File

@ -12,6 +12,13 @@ namespace Aurora::IO::Loop
struct ILoopSource;
}
namespace Aurora::IO::CompletionGroup
{
struct ICompletionGroup;
}
#include "CompletionGroup/ICompletionGroupWorkHandle.hpp"
namespace Aurora::IO
{
/**
@ -27,18 +34,18 @@ namespace Aurora::IO
virtual bool StartWrite(AuUInt64 uOffset, const AuSPtr<Memory::MemoryViewRead> &memoryView) = 0;
/**
* @brief Non-blocking is-signaled and call callback poll routine
* @brief "Non-blocking" is-signaled and call callback poll routine (similar to nt alertable sleeps of a period zero)
*/
virtual bool Complete() = 0;
/**
* @brief
* @brief Non-blocking has-failed (no callbacks, no alert sleep)
* @return
*/
virtual bool Failed() = 0;
virtual bool HasFailed() = 0;
/**
* @brief non-blocking is-signaled (...and dispatched via ::Complete() or other IO yield)
* @brief Non-blocking is-signaled (...and dispatched via ::Complete() or other IO yield) (no callbacks, no alert sleep)
* @return
*/
virtual bool HasCompleted() = 0;
@ -78,6 +85,18 @@ namespace Aurora::IO
virtual void SetBaseOffset(AuUInt64 uBaseOffset) = 0;
// TODO: remove me (new api stub)
inline virtual bool TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup)
{
return false;
}
// TODO: remove me (new api stub)
inline virtual CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle()
{
return nullptr;
}
AURT_ADD_USR_DATA;
};
}

View File

@ -56,4 +56,6 @@
#include "IIOWaitableIOTimer.hpp"
#include "IIOWaitableIOLoopSource.hpp"
#include "IOPipeInterceptorNop.hpp"
#include "IOPipeInterceptorNop.hpp"
#include "CompletionGroup/CompletionGroup.hpp"

View File

@ -60,6 +60,9 @@ namespace Aurora::IO::Loop
// fast objects
eSourceFastSemaphore,
eSourceFastEvent,
eSourceFastMutex
eSourceFastMutex,
// batched aio completion groups
eSourceCompletionGroup
};
}

View File

@ -58,6 +58,9 @@ namespace Aurora::Async
protected:
bool CheckAlive();
virtual void DispatchTask(IWorkItemHandler::ProcessInfo &info);
virtual void Cleanup();
private:
void RunAsyncLocked();
void RunAsyncLocked2();
@ -67,9 +70,6 @@ namespace Aurora::Async
void DispatchEx(bool check);
void DispatchExLocked(bool check);
virtual void DispatchTask(IWorkItemHandler::ProcessInfo &info);
virtual void Cleanup();
IThreadPoolInternal *owner_ {};
AuSPtr<ThreadState> GetState();

View File

@ -268,7 +268,7 @@ namespace Aurora::IO::Adapters
}
// Transaction error
if (parent->transaction->Failed())
if (parent->transaction->HasFailed())
{
parent->bAsyncActive = false;
parent->transaction->Reset();
@ -374,7 +374,7 @@ namespace Aurora::IO::Adapters
return EStreamError::eErrorNone;
}
if (parent->transaction && parent->transaction->Failed())
if (parent->transaction && parent->transaction->HasFailed())
{
SysPushErrorIO("AIO transaction read failed: {}", parent->transaction->GetOSErrorCode());
parent->errorCode = EStreamError::eErrorStreamInterrupted;
@ -503,7 +503,7 @@ namespace Aurora::IO::Adapters
{
if (parent->transaction->Complete())
{
if (parent->transaction->Failed())
if (parent->transaction->HasFailed())
{
SysPushErrorIO("AIO transaction write failed: {}", parent->transaction->GetOSErrorCode());
parent->errorCode = EStreamError::eErrorStreamInterrupted;

View File

@ -0,0 +1,233 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: CompletionGroup.cpp
Date: 2023-12-28
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "CompletionGroup.hpp"
#include "CompletionGroupAndedIOWorkItem.hpp"
namespace Aurora::IO::CompletionGroup
{
struct CompletionGroup;
CompletionGroup::CompletionGroup() :
anyProbablyAlwaysPresentEvent(this, false),
andPlsDontAllocateFdIfUntouchedEvent(this, true)
{
}
CompletionGroup::~CompletionGroup()
{
this->ResetMemoryPins();
}
bool CompletionGroup::HasCompleted()
{
return this->uTriggered == this->uAdded &&
bool(this->uAdded);
}
AuPair<AuUInt32, AuUInt32> CompletionGroup::GetStats()
{
return AuMakePair(this->uTriggered, this->uAdded);
}
void CompletionGroup::SetCallbacks(const AuSPtr<ICompletionGroupHooks> &pCallbacks)
{
AU_LOCK_GUARD(this->mutex);
this->pCallbacks = pCallbacks;
}
void CompletionGroup::ResetMemoryPins()
{
AuResetMember(this->pCallbacks);
AuResetMember(this->pAnyBarrier);
AuResetMember(this->pAndBarrier);
for (const auto &pOld : this->workItems)
{
pOld->CleanupForGCWI();
}
}
bool CompletionGroup::HasItemsActive()
{
return this->workItems.size();
}
void CompletionGroup::DoIOTick()
{
AU_LOCK_GUARD(this->mutex);
for (auto itr = this->workItems.begin();
itr != this->workItems.end();
)
{
if (itr->get()->HasCompletedForGCWI())
{
auto that = *itr;
itr = this->workItems.erase(itr);
this->uTriggered++;
if (this->pCallbacks)
{
try
{
this->pCallbacks->OnHandleComplete(that);
}
catch (...)
{
SysPushErrorCatch();
}
}
that->CleanupForGCWI();
}
else
{
itr++;
}
}
if (this->workItems.empty())
{
this->andPlsDontAllocateFdIfUntouchedEvent.Set();
if (AuExchange(this->pCallbacks, {}))
{
try
{
this->pCallbacks->OnComplete();
}
catch (...)
{
SysPushErrorCatch();
}
}
// anyone else?
this->ResetMemoryPins();
}
}
void CompletionGroup::AddWorkItem(AuSPtr<ICompletionGroupWorkItem> pCompletable)
{
AU_LOCK_GUARD(this->mutex);
this->workItems.push_back(pCompletable);
this->uAdded++;
}
AuSPtr<Loop::ILoopSource> CompletionGroup::ToAndLoopSource()
{
return this->andPlsDontAllocateFdIfUntouchedEvent.GetLoopSource();
}
AuSPtr<Loop::ILoopSource> CompletionGroup::ToAnyLoopSource()
{
return this->anyProbablyAlwaysPresentEvent.GetLoopSource();
}
AuSPtr<Loop::ILSEvent> CompletionGroup::GetTriggerLoopSource()
{
return this->anyProbablyAlwaysPresentEvent.GetLoopSource();
}
void CompletionGroup::TryTrigger()
{
if (auto pSource = GetTriggerLoopSource())
{
pSource->Set();
}
this->DoIOTick();
}
AuSPtr<Async::IWorkItem> CompletionGroup::OnCompletion()
{
AU_LOCK_GUARD(this->mutex);
if (this->pAndBarrier)
{
return this->pAndBarrier;
}
auto pWorker = AuAsync::GetCurrentWorkerPId().GetPool();
if (!pWorker)
{
pWorker = AuUnsafeRaiiToShared(AuAsync::GetAsyncApp());
}
auto pRet = AuMakeShared<CompletionGroupAndedIOWorkItem>((AuAsync::IThreadPoolInternal *)AuStaticPointerCast<AuAsync::ThreadPool>(pWorker).get(),
AuWorkerPId_t {},
this->SharedFromThis());
if (!pRet)
{
SysPushErrorMemory();
return {};
}
// prevent attaching a singular loop source to a loop queue multiple times
if (this->pAnyBarrier)
{
auto pEvent = this->andPlsDontAllocateFdIfUntouchedEvent.GetLoopSource();
if (!pEvent)
{
return {};
}
pRet->SetSchedByLoopSource(pEvent)->Dispatch();
}
else
{
auto pEvent = this->anyProbablyAlwaysPresentEvent.GetLoopSource();
if (!pEvent)
{
return {};
}
pRet->SetSchedByLoopSource(pEvent)->Dispatch();
this->bNoAny = true;
}
this->pAndBarrier = pRet;
return pRet;
}
AuSPtr<Async::IWorkItem> CompletionGroup::OnSingleCompletion()
{
AU_LOCK_GUARD(this->mutex);
if (this->pAnyBarrier)
{
return this->pAnyBarrier;
}
if (!this->bNoAny)
{
SysPushErrorGeneric("To prevent double LoopQueue::SourceAdd on the same source, you must not call ::OnSingleCompletion() after ::OnCompletion() in this specific order");
return {};
}
auto pRet = AuAsync::NewFence();
if (!pRet)
{
SysPushErrorNested();
return {};
}
pRet->SetSchedByLoopSource(this->anyProbablyAlwaysPresentEvent.GetLoopSource())->Dispatch();
this->pAnyBarrier = pRet;
return pRet;
}
AUKN_SYM AuSPtr<ICompletionGroup> NewCompletionGroup()
{
return AuMakeShared<CompletionGroup>();
}
}

View File

@ -0,0 +1,53 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: CompletionGroup.hpp
Date: 2023-12-28
Author: Reece
***/
#pragma once
#include "CompletionQuantumEventProvider.hpp"
namespace Aurora::IO::CompletionGroup
{
struct CompletionGroup :
AuEnableSharedFromThis<CompletionGroup>,
ICompletionGroup
{
CompletionGroup();
~CompletionGroup();
AuSPtr<Loop::ILoopSource> ToAndLoopSource() override;
AuSPtr<Loop::ILoopSource> ToAnyLoopSource() override;
AuSPtr<Async::IWorkItem> OnCompletion() override;
AuSPtr<Async::IWorkItem> OnSingleCompletion() override;
AuSPtr<Loop::ILSEvent> GetTriggerLoopSource() override;
void TryTrigger() override;
bool HasCompleted() override;
AuPair<AuUInt32, AuUInt32> GetStats() override;
void SetCallbacks(const AuSPtr<ICompletionGroupHooks> &pCallbacks) override;
void DoIOTick();
void ResetMemoryPins();
bool HasItemsActive();
void AddWorkItem(AuSPtr<ICompletionGroupWorkItem> pCompletable) override;
private:
AuCriticalSection mutex;
CompletionQuantumEventProvider anyProbablyAlwaysPresentEvent;
CompletionQuantumEventProvider andPlsDontAllocateFdIfUntouchedEvent;
AuList<AuSPtr<ICompletionGroupWorkItem>> workItems;
AuSPtr<ICompletionGroupHooks> pCallbacks;
AuSPtr<Async::IWorkItem> pAnyBarrier;
AuSPtr<Async::IWorkItem> pAndBarrier;
AuUInt32 uAdded {};
AuUInt32 uTriggered {};
bool bNoAny {};
};
}

View File

@ -0,0 +1,33 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: CompletionGroupAndedIOWorkItem.cpp
Date: 2023-12-28
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "CompletionGroup.hpp"
#include "CompletionGroupAndedIOWorkItem.hpp"
namespace Aurora::IO::CompletionGroup
{
CompletionGroupAndedIOWorkItem::CompletionGroupAndedIOWorkItem(Async::IThreadPoolInternal *owner,
const AuWorkerID &worker,
AuSPtr<CompletionGroup> pParent) :
Async::WorkItem(owner, worker, nullptr),
pParent(pParent)
{
}
void CompletionGroupAndedIOWorkItem::DispatchTask(Async::IWorkItemHandler::ProcessInfo &info)
{
this->pParent->DoIOTick();
if (this->pParent->HasItemsActive())
{
info = AuAsync::ETickType::eSchedule;
}
}
}

View File

@ -0,0 +1,24 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: CompletionGroupAndedIOWorkItem.hpp
Date: 2023-12-28
Author: Reece
***/
#pragma once
#include <Source/Async/WorkItem.hpp>
namespace Aurora::IO::CompletionGroup
{
struct CompletionGroupAndedIOWorkItem : Async::WorkItem
{
CompletionGroupAndedIOWorkItem(Async::IThreadPoolInternal *owner,
const AuWorkerID &worker,
AuSPtr<CompletionGroup> pParent);
void DispatchTask(Async::IWorkItemHandler::ProcessInfo &info) override;
AuSPtr<CompletionGroup> pParent;
};
}

View File

@ -0,0 +1,68 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: CompletionGroupLoopSource.cpp
Date: 2023-12-28
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "CompletionGroup.hpp"
#include "CompletionGroupLoopSource.hpp"
namespace Aurora::IO::CompletionGroup
{
CompletionGroupLoopSource::CompletionGroupLoopSource(CompletionGroup *pParent, bool bAnd) :
pParent(pParent),
bIsAnd(bAnd),
Loop::LSEvent()
{ }
bool CompletionGroupLoopSource::IsSignaled()
{
if (!Loop::LSEvent::IsSignaled())
{
return false;
}
this->pParent->DoIOTick();
if (this->bIsAnd)
{
return !this->pParent->HasItemsActive();
}
else
{
return true;
}
}
bool CompletionGroupLoopSource::OnTrigger(AuUInt handle)
{
if (!Loop::LSEvent::OnTrigger(handle))
{
return false;
}
this->pParent->DoIOTick();
if (this->bIsAnd)
{
return !this->pParent->HasItemsActive();
}
else
{
return true;
}
}
bool CompletionGroupLoopSource::DoInit(bool bSignaledAlready)
{
return this->TryInit(bSignaledAlready, true, true);
}
Loop::ELoopSource CompletionGroupLoopSource::GetType()
{
return Loop::ELoopSource::eSourceCompletionGroup;
}
}

View File

@ -0,0 +1,35 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: CompletionGroupLoopSource.hpp
Date: 2023-12-28
Author: Reece
***/
#pragma once
#include <Source/IO/Loop/LSLocalEvent.hpp>
#include <Source/IO/Loop/LSEvent.hpp>
namespace Aurora::IO::CompletionGroup
{
struct CompletionGroupLoopSource :
Loop::LSEvent
// Loop::LSLocalEvent -> not going to work on NT. ugh
// Nt is going to expect event handles...
// ...outside of XPs dumb-fuck SignalAndWait api because nobody at msft knew how to implement a condvar, every other IO subsystem is documented to expect event handles, not semaphore kernel objects.
// linshid: we always use eventfd
// posix: we could always fallback on pipe fds
// bsd: doesnt matter so long as the kqueue supplier for loopqueues is in place
{
CompletionGroupLoopSource(CompletionGroup *pParent, bool bAnd);
bool DoInit(bool bSignaledAlready);
bool IsSignaled() override;
bool OnTrigger(AuUInt handle) override;
Loop::ELoopSource GetType() override;
CompletionGroup *pParent;
bool bIsAnd {};
};
}

View File

@ -0,0 +1,48 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: CompletionQuantumEventProvider.cpp
Date: 2023-12-28
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include <Aurora/IO/IOExperimental.hpp>
#include "CompletionGroup.hpp"
#include "CompletionQuantumEventProvider.hpp"
namespace Aurora::IO::CompletionGroup
{
CompletionQuantumEventProvider::CompletionQuantumEventProvider(CompletionGroup *pParent, bool bAnd) :
pParent(pParent),
loopSource(pParent, bAnd)
{
}
void CompletionQuantumEventProvider::Set()
{
this->bWasSet = true;
}
AuSPtr<Loop::ILSEvent> CompletionQuantumEventProvider::GetLoopSource()
{
if (!AuThreading::InitOnceLocker::TryLock(&this->initOnce, true))
{
this->initOnce.Wait();
}
else
{
this->bHasInit = this->loopSource.DoInit(this->bWasSet);
AuThreading::InitOnceLocker::Finish(&this->initOnce, !this->bHasInit);
}
if (!this->bHasInit)
{
return {};
}
return AuSPtr<Loop::ILSEvent>(this->pParent->SharedFromThis(),
&this->loopSource);
}
}

View File

@ -0,0 +1,29 @@
/***
Copyright (C) 2023 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: CompletionQuantumEventProvider.hpp
Date: 2023-12-28
Author: Reece
***/
#pragma once
#include "CompletionGroupLoopSource.hpp"
namespace Aurora::IO::CompletionGroup
{
struct CompletionQuantumEventProvider
{
CompletionQuantumEventProvider(CompletionGroup *pParent, bool bAnd);
void Set();
AuSPtr<Loop::ILSEvent> GetLoopSource();
private:
CompletionGroup *pParent {};
bool bWasSet {};
bool bHasInit {};
AuInitOnce initOnce;
CompletionGroupLoopSource loopSource;
};
}

View File

@ -484,7 +484,7 @@ namespace Aurora::IO::FS
return this->bTxFinished_;
}
bool LinuxAsyncFileTransaction::Failed()
bool LinuxAsyncFileTransaction::HasFailed()
{
return this->hasError_;
}

View File

@ -56,7 +56,7 @@ namespace Aurora::IO::FS
bool Complete() override;
bool Failed() override;
bool HasFailed() override;
AuUInt GetOSErrorCode() override;
bool HasCompleted() override;

View File

@ -12,6 +12,7 @@
#include "Async.NT.hpp"
#include "FileAdvisory.NT.hpp"
#include <Source/IO/Loop/Loop.hpp>
#include <Source/IO/Loop/LSEvent.hpp>
#include <Source/IO/IPC/AuIPCPipe.NT.hpp>
namespace Aurora::IO::FS
@ -59,7 +60,10 @@ namespace Aurora::IO::FS
NtAsyncFileTransaction::~NtAsyncFileTransaction()
{
AuWin32CloseHandle(this->event);
if (this->bOwnsEvent_)
{
AuWin32CloseHandle(this->event);
}
}
AuSPtr<IIOHandle> NtAsyncFileStream::GetHandle()
@ -106,6 +110,8 @@ namespace Aurora::IO::FS
LARGE_INTEGER i {};
i.QuadPart = offset;
parameters.outVariable = 0;
auto hOptSafe = this->pHandle_->GetOSReadHandleSafe();
if (!hOptSafe)
{
@ -149,6 +155,8 @@ namespace Aurora::IO::FS
LARGE_INTEGER i {};
i.QuadPart = offset;
parameters.outVariable = 0;
auto hOptSafe = this->pHandle_->GetOSWriteHandleSafe();
if (!hOptSafe)
{
@ -464,7 +472,33 @@ namespace Aurora::IO::FS
AuResetMember(this->pMemoryHold);
}
bool NtAsyncFileTransaction::Failed()
bool NtAsyncFileTransaction::TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup)
{
if (!this->bOwnsEvent_ ||
!pCompletionGroup)
{
return false;
}
auto pLoopSource = pCompletionGroup->GetTriggerLoopSource();
if (!pLoopSource)
{
return false;
}
this->bOwnsEvent_ = false;
this->event = this->overlap.hEvent = (HANDLE)AuStaticCast<Loop::LSEvent>(pLoopSource)->GetHandle();
pCompletionGroup->AddWorkItem(this->SharedFromThis());
this->pCompletionGroup_ = pCompletionGroup;
return true;
}
CompletionGroup::ICompletionGroupWorkHandle *NtAsyncFileTransaction::ToCompletionGroupHandle()
{
return this;
}
bool NtAsyncFileTransaction::HasFailed()
{
return this->bHasFailed &&
this->dwOsErrorCode != ERROR_BROKEN_PIPE &&
@ -537,6 +571,16 @@ namespace Aurora::IO::FS
return CompleteEx(false);
}
bool NtAsyncFileTransaction::HasCompletedForGCWI()
{
return this->HasCompleted();
}
void NtAsyncFileTransaction::CleanupForGCWI()
{
this->event = INVALID_HANDLE_VALUE;
}
bool NtAsyncFileTransaction::HasCompleted()
{
return bool(this->dwLastBytes) ||
@ -591,7 +635,18 @@ namespace Aurora::IO::FS
AuSPtr<AuLoop::ILoopSource> NtAsyncFileTransaction::NewLoopSource()
{
SysCheckRetExpNotNullMemory(AuMakeShared<NtAsyncFileTransactionLoopSource>(AuSharedFromThis()), {});
if (this->bOwnsEvent_)
{
SysCheckRetExpNotNullMemory(AuMakeShared<NtAsyncFileTransactionLoopSource>(AuSharedFromThis()), {});
}
else if (this->pCompletionGroup_)
{
return this->pCompletionGroup_->ToAnyLoopSource();
}
else
{
return {};
}
}
AUKN_SYM IAsyncFileStream *OpenAsyncNew(const AuString &path, EFileOpenMode openMode, bool directIO, EFileAdvisoryLockLevel lock)

View File

@ -30,7 +30,10 @@ namespace Aurora::IO::FS
AuSPtr<IIOHandle> pHandle_;
};
struct NtAsyncFileTransaction : IAsyncTransaction, AuEnableSharedFromThis<NtAsyncFileTransaction>
struct NtAsyncFileTransaction :
IAsyncTransaction,
CompletionGroup::ICompletionGroupWorkItem,
AuEnableSharedFromThis<NtAsyncFileTransaction>
{
~NtAsyncFileTransaction();
@ -47,9 +50,12 @@ namespace Aurora::IO::FS
bool Complete() override;
bool HasCompletedForGCWI() override;
void CleanupForGCWI() override;
bool CompleteEx(AuUInt completeRoutine, bool bForce = false);
bool Failed() override;
bool HasFailed() override;
AuUInt GetOSErrorCode() override;
AuUInt32 GetLastPacketLength() override;
@ -62,6 +68,9 @@ namespace Aurora::IO::FS
AuSPtr<AuLoop::ILoopSource> NewLoopSource() override;
void Reset() override;
bool TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup) override;
CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle() override;
bool IDontWannaUsePorts();
@ -87,5 +96,7 @@ namespace Aurora::IO::FS
AuSPtr<IIOHandle> pHandle_;
AuWPtr<IIOHandle> wpHandle_;
AuSPtr<IAsyncFinishedSubscriber> pSub_;
bool bOwnsEvent_ { true };
AuSPtr<CompletionGroup::ICompletionGroup> pCompletionGroup_;
};
}

View File

@ -222,6 +222,8 @@ namespace Aurora::IO::IPC
{
DWORD size = write.length;
write.outVariable = 0;
TryConnect();
auto h = this->GetPipeHandle();
@ -277,6 +279,8 @@ namespace Aurora::IO::IPC
bool IPCPipeImpl::Write(const Memory::MemoryViewStreamRead &read)
{
read.outVariable = 0;
auto h = this->GetPipeHandle();
if (h == INVALID_HANDLE_VALUE)
{

View File

@ -37,6 +37,8 @@ namespace Aurora::IO::IPC
EStreamError IPCPipeReader::Read(const Memory::MemoryViewStreamWrite &parameters)
{
parameters.outVariable = 0;
if (this->pParent->Read(parameters, true))
{
return EStreamError::eErrorNone;

View File

@ -32,6 +32,8 @@ namespace Aurora::IO::IPC
EStreamError IPCPipeWriter::Write(const Memory::MemoryViewStreamRead &parameters)
{
parameters.outVariable = 0;
if (this->pParent->Write(parameters))
{
return EStreamError::eErrorNone;

View File

@ -11,6 +11,11 @@
namespace Aurora::IO::Loop
{
LSEvent::LSEvent()
{
}
LSEvent::LSEvent(bool triggered, bool atomicRelease, bool permitMultipleTriggers) : atomicRelease_(atomicRelease)
{
Init(triggered);
@ -30,6 +35,13 @@ namespace Aurora::IO::Loop
}
}
bool TryInit(bool bTriggered, bool bAtomicRelease, bool bPermitMultipleTriggers)
{
this->handle = ::eventfd(bTriggered ? 1 : 0, EFD_NONBLOCK | EFD_CLOEXEC);
this->atomicRelease_ = bAtomicRelease;
return this->HasValidHandle();
}
bool LSEvent::OnTrigger(AuUInt handle)
{
AuUInt64 oldSemaphoreValue {};

View File

@ -1,33 +1,36 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: LSEvent.Linux.hpp
Date: 2022-4-4
Author: Reece
***/
#pragma once
#include "LSHandle.hpp"
namespace Aurora::IO::Loop
{
struct LSEvent : ILSEvent, virtual LSHandle
{
LSEvent(bool triggered, bool atomicRelease, bool permitMultipleTriggers);
LSEvent(int handle, bool triggered, bool atomicRelease);
~LSEvent();
bool Set() override;
bool Reset() override;
virtual bool OnTrigger(AuUInt handle) override;
virtual bool IsSignaled() override;
virtual bool WaitOn(AuUInt32 timeout) override;
virtual ELoopSource GetType() override;
private:
void Init(bool init);
bool IsSignaledNonblocking();
bool atomicRelease_;
};
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: LSEvent.Linux.hpp
Date: 2022-4-4
Author: Reece
***/
#pragma once
#include "LSHandle.hpp"
namespace Aurora::IO::Loop
{
struct LSEvent : ILSEvent, virtual LSHandle
{
LSEvent();
LSEvent(bool triggered, bool atomicRelease, bool permitMultipleTriggers);
LSEvent(int handle, bool triggered, bool atomicRelease);
~LSEvent();
bool Set() override;
bool Reset() override;
bool TryInit(bool bTriggered, bool bAtomicRelease, bool bPermitMultipleTriggers);
virtual bool OnTrigger(AuUInt handle) override;
virtual bool IsSignaled() override;
virtual bool WaitOn(AuUInt32 timeout) override;
virtual ELoopSource GetType() override;
private:
void Init(bool init);
bool IsSignaledNonblocking();
bool atomicRelease_;
};
}

View File

@ -22,6 +22,17 @@ namespace Aurora::IO::Loop
this->UpdateReadHandle((AuUInt)::CreateEventW(NULL, !atomicRelease, triggered, NULL));
}
LSEvent::LSEvent()
{
}
bool LSEvent::TryInit(bool bTriggered, bool bAtomicRelease, bool bPermitMultipleTriggers)
{
this->UpdateReadHandle((AuUInt)::CreateEventW(NULL, !bAtomicRelease, bTriggered, NULL));
return this->HasValidHandle();
}
LSEvent::~LSEvent()
{
auto handle = reinterpret_cast<HANDLE>(this->handle);

View File

@ -12,10 +12,13 @@ namespace Aurora::IO::Loop
{
struct LSEvent : ILSEvent, virtual LSHandle
{
LSEvent();
LSEvent(HANDLE h);
LSEvent(bool triggered, bool atomicRelease, bool permitMultipleTriggers);
~LSEvent();
bool TryInit(bool bTriggered, bool bAtomicRelease, bool bPermitMultipleTriggers);
bool Set() override;
bool Reset() override;

View File

@ -18,7 +18,7 @@ namespace Aurora::IO::Loop
bool TryInit(bool bTriggered, bool bAtomicRelease, bool bPermitMultipleTriggers);
bool IsSignaled() override;
virtual bool IsSignaled() override;
bool WaitOn(AuUInt32 timeout) override;
ELoopSource GetType() override;

View File

@ -162,7 +162,7 @@ namespace Aurora::IO::Net
if (this->pNetReadTransaction)
{
if (this->pNetReadTransaction->Failed())
if (this->pNetReadTransaction->HasFailed())
{
NetError_SetOsError(error, this->pNetReadTransaction->GetOSErrorCode());
bSetError = true;

View File

@ -221,7 +221,7 @@ namespace Aurora::IO::Net
return false;
}
bool LinuxAsyncNetworkTransaction::Failed()
bool LinuxAsyncNetworkTransaction::HasFailed()
{
return this->bHasFailed;
}

View File

@ -1,87 +1,87 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetStream.Linux.hpp
Date: 2022-8-26
Author: Reece
***/
#pragma once
#include <Source/IO/Loop/Loop.hpp>
#include <Source/IO/Loop/ILoopSourceEx.hpp>
#include <Source/IO/Loop/LSHandle.hpp>
#include <Source/IO/Loop/LSEvent.hpp>
#include <Source/IO/FS/Async.Linux.hpp>
#include <Source/IO/UNIX/IOSubmit.Linux.hpp>
namespace Aurora::IO::Net
{
struct SocketBase;
struct LinuxAsyncNetworkTransaction : IAsyncTransaction, AuEnableSharedFromThis<LinuxAsyncNetworkTransaction>, Aurora::IO::UNIX::ASubmittable
{
LinuxAsyncNetworkTransaction(SocketBase *pSocket);
~LinuxAsyncNetworkTransaction();
bool StartRead(AuUInt64 offset, const AuSPtr<AuMemoryViewWrite> &memoryView) override;
bool StartWrite(AuUInt64 offset, const AuSPtr<AuMemoryViewRead> &memoryView) override;
bool Complete() override;
bool CompleteEx(AuUInt completeRoutine);
bool Failed() override;
AuUInt GetOSErrorCode() override;
bool HasCompleted() override;
AuUInt32 GetLastPacketLength() override;
void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &sub) override;
bool Wait(AuUInt32 timeout) override;
AuSPtr<AuLoop::ILoopSource> NewLoopSource() override;
void Reset() override;
void SetBaseOffset(AuUInt64 uBaseOffset) override;
virtual void LIOS_Process(AuUInt32 read, bool failure, int err, bool mark) override;
void MakeSyncable();
void ForceNextWriteWait();
bool IDontWannaUsePorts();
void DispatchCb(AuUInt32 len);
int GetSocket();
int GetAlertable();
bool TranslateLastError(bool bReturnValue);
SocketBase * pSocket;
AuSPtr<Loop::ILSEvent> pWaitable;
bool bForceNextWait {};
AuSPtr<LinuxAsyncNetworkTransaction> pPin;
AuUInt32 dwLastAbstractStat {},
dwLastAbstractOffset {},
dwLastBytes {};
bool bIsIrredeemable {};
bool bLatch {};
AuUInt32 dwOsErrorCode {};
AuUInt32 dwRecvFlags {};
AuUInt32 dwSendFlags {};
bool bHasFailed {};
AuSPtr<void> pMemoryHold;
bool bDisallowRecv {};
bool bDisallowSend {};
bool bSendEOSOnce {};
bool bIsWriting {};
bool bDatagramMode {};
AuSPtr<IAsyncFinishedSubscriber> pSub;
int iSocketLength {};
NetEndpoint netEndpoint;
};
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetStream.Linux.hpp
Date: 2022-8-26
Author: Reece
***/
#pragma once
#include <Source/IO/Loop/Loop.hpp>
#include <Source/IO/Loop/ILoopSourceEx.hpp>
#include <Source/IO/Loop/LSHandle.hpp>
#include <Source/IO/Loop/LSEvent.hpp>
#include <Source/IO/FS/Async.Linux.hpp>
#include <Source/IO/UNIX/IOSubmit.Linux.hpp>
namespace Aurora::IO::Net
{
struct SocketBase;
struct LinuxAsyncNetworkTransaction : IAsyncTransaction, AuEnableSharedFromThis<LinuxAsyncNetworkTransaction>, Aurora::IO::UNIX::ASubmittable
{
LinuxAsyncNetworkTransaction(SocketBase *pSocket);
~LinuxAsyncNetworkTransaction();
bool StartRead(AuUInt64 offset, const AuSPtr<AuMemoryViewWrite> &memoryView) override;
bool StartWrite(AuUInt64 offset, const AuSPtr<AuMemoryViewRead> &memoryView) override;
bool Complete() override;
bool CompleteEx(AuUInt completeRoutine);
bool HasFailed() override;
AuUInt GetOSErrorCode() override;
bool HasCompleted() override;
AuUInt32 GetLastPacketLength() override;
void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &sub) override;
bool Wait(AuUInt32 timeout) override;
AuSPtr<AuLoop::ILoopSource> NewLoopSource() override;
void Reset() override;
void SetBaseOffset(AuUInt64 uBaseOffset) override;
virtual void LIOS_Process(AuUInt32 read, bool failure, int err, bool mark) override;
void MakeSyncable();
void ForceNextWriteWait();
bool IDontWannaUsePorts();
void DispatchCb(AuUInt32 len);
int GetSocket();
int GetAlertable();
bool TranslateLastError(bool bReturnValue);
SocketBase * pSocket;
AuSPtr<Loop::ILSEvent> pWaitable;
bool bForceNextWait {};
AuSPtr<LinuxAsyncNetworkTransaction> pPin;
AuUInt32 dwLastAbstractStat {},
dwLastAbstractOffset {},
dwLastBytes {};
bool bIsIrredeemable {};
bool bLatch {};
AuUInt32 dwOsErrorCode {};
AuUInt32 dwRecvFlags {};
AuUInt32 dwSendFlags {};
bool bHasFailed {};
AuSPtr<void> pMemoryHold;
bool bDisallowRecv {};
bool bDisallowSend {};
bool bSendEOSOnce {};
bool bIsWriting {};
bool bDatagramMode {};
AuSPtr<IAsyncFinishedSubscriber> pSub;
int iSocketLength {};
NetEndpoint netEndpoint;
};
}

View File

@ -369,7 +369,7 @@ namespace Aurora::IO::Net
return false;
}
bool NtAsyncNetworkTransaction::Failed()
bool NtAsyncNetworkTransaction::HasFailed()
{
return this->bHasFailed &&
this->dwOsErrorCode != ERROR_HANDLE_EOF &&

View File

@ -28,7 +28,7 @@ namespace Aurora::IO::Net
bool Complete() override;
bool Failed() override;
bool HasFailed() override;
bool HasErrorCode();
AuUInt GetOSErrorCode() override;

View File

@ -146,6 +146,8 @@ namespace Aurora::IO::Protocol
EStreamError Write(const Memory::MemoryViewStreamRead &parameters) override
{
parameters.outVariable = 0;
auto pCopy = AuMakeShared<AuByteBuffer>(parameters);
if (!pCopy ||
@ -309,6 +311,8 @@ namespace Aurora::IO::Protocol
EStreamError Write(const Memory::MemoryViewStreamRead &parameters) override
{
parameters.outVariable = 0;
auto pCopy = AuMakeShared<AuByteBuffer>(parameters);
if (!pCopy ||
@ -430,6 +434,8 @@ namespace Aurora::IO::Protocol
EStreamError Write(const Memory::MemoryViewStreamRead &parameters) override
{
parameters.outVariable = 0;
auto pCopy = AuMakeShared<AuByteBuffer>(parameters);
if (!pCopy ||

View File

@ -36,6 +36,8 @@ namespace Aurora::IO::Protocol
EStreamError SpecialWriter::Write(const Memory::MemoryViewStreamRead &parameters)
{
parameters.outVariable = 0;
auto pStack = this->pParent;
if (pStack->bDead)
{

View File

@ -227,6 +227,8 @@ namespace Aurora::Processes
bool ProcessImpl::Read(EStandardHandle stream, const AuMemoryViewStreamWrite &destination, bool nonblock)
{
destination.outVariable = 0;
if (!IO::EStandardStreamIsValid(stream) || (stream == EStandardHandle::eInputStream))
{
SysPushErrorArg("Invalid Stream");
@ -288,6 +290,8 @@ namespace Aurora::Processes
bool ProcessImpl::Write(const AuMemoryViewStreamRead &source)
{
source.outVariable = 0;
auto handle = this->pipeStdIn_[1];
if (!handle)
{