[+] IO Thread Pool

[+] FileSeekingWriter
[+] ISeekingWriter
[+] AuIO::Adapters::NewAsyncTransactionFromStreamReader
[+] AuIO::Adapters::NewAsyncTransactionFromStreamSeekingReader
[+] AuIO::Adapters::NewAsyncTransactionFromStreamWriter
[+] AuIO::Adapters::NewAsyncTransactionFromStreamSeekingWriter
[+] AuIO::Async::UseSpecifiedWorkerGroup
[+] AuMemory::NewSharableResizableBuffer
[+] AuMemory::NewSharableBuffer
[*] Update comments
This commit is contained in:
Reece Wilson 2024-02-26 16:48:10 +00:00
parent 7dbf564a27
commit 1920f5a8d5
25 changed files with 829 additions and 42 deletions

View File

@ -123,7 +123,7 @@ namespace Aurora::Async
cstatic WorkerPId_t FromLocalAnyInGroup(ThreadGroup_t group) cstatic WorkerPId_t FromLocalAnyInGroup(ThreadGroup_t group)
{ {
auto worker = GetCurrentWorkerPId(); auto worker = GetCurrentWorkerPId();
SysAssertDbg(worker, "No attached to a thread pool"); SysAssertDbg(worker, "Not attached to a thread pool");
return FromJobRunner(worker.GetPool(), group); return FromJobRunner(worker.GetPool(), group);
} }
@ -131,14 +131,14 @@ namespace Aurora::Async
ThreadId_t id) ThreadId_t id)
{ {
auto worker = GetCurrentWorkerPId(); auto worker = GetCurrentWorkerPId();
SysAssertDbg(worker, "No attached to a thread pool"); SysAssertDbg(worker, "Not attached to a thread pool");
return FromJobRunner(worker.GetPool(), group, id); return FromJobRunner(worker.GetPool(), group, id);
} }
cstatic WorkerPId_t FromLocalJobRunner(WorkerId_t id) cstatic WorkerPId_t FromLocalJobRunner(WorkerId_t id)
{ {
auto worker = GetCurrentWorkerPId(); auto worker = GetCurrentWorkerPId();
SysAssertDbg(worker, "No attached to a thread pool"); SysAssertDbg(worker, "Not attached to a thread pool");
return FromJobRunner(worker.GetPool(), id); return FromJobRunner(worker.GetPool(), id);
} }
@ -146,14 +146,14 @@ namespace Aurora::Async
ThreadId_t id) ThreadId_t id)
{ {
auto worker = GetCurrentWorkerPId(); auto worker = GetCurrentWorkerPId();
SysAssertDbg(worker, "No attached to a thread pool"); SysAssertDbg(worker, "Not attached to a thread pool");
return FromJobRunner(worker.GetPool(), group, id); return FromJobRunner(worker.GetPool(), group, id);
} }
cstatic WorkerPId_t FromLocalThreadID(ThreadId_t id) cstatic WorkerPId_t FromLocalThreadID(ThreadId_t id)
{ {
auto worker = GetCurrentWorkerPId(); auto worker = GetCurrentWorkerPId();
SysAssertDbg(worker, "No in thread"); SysAssertDbg(worker, "Not in thread");
return FromJobRunner(worker.GetPool(), worker.GetGroup(), id); return FromJobRunner(worker.GetPool(), worker.GetGroup(), id);
} }

View File

@ -0,0 +1,16 @@
/***
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOAdapterAsyncDelegators.hpp
Date: 2024-2-24
Author: Reece
***/
#pragma once
namespace Aurora::IO::Adapters
{
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamReader(const AuSPtr<IStreamReader> &pStreamReader, AuOptional<Aurora::Async::WorkerPId_t> workers);
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamSeekingReader(const AuSPtr<ISeekingReader> &pStreamReader, AuOptional<Aurora::Async::WorkerPId_t> workers);
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamWriter(const AuSPtr<IStreamWriter> &pStreamWriter, AuOptional<Aurora::Async::WorkerPId_t> workers);
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamSeekingWriter(const AuSPtr<ISeekingWriter> &pStreamWriter, AuOptional<Aurora::Async::WorkerPId_t> workers);
}

View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOAsync.hpp
Date: 2024-2-25
Author: Reece
***/
#pragma once
namespace Aurora::IO::Async
{
AUKN_SYM void UseSpecifiedWorkerGroup(Aurora::Async::WorkerPId_t worker);
}

View File

@ -31,7 +31,7 @@ namespace Aurora::IO::CompletionGroup
virtual AuPair<AuUInt32, AuUInt32> GetStats() = 0; virtual AuPair<AuUInt32, AuUInt32> GetStats() = 0;
virtual void SetCallbacks(const AuSPtr<ICompletionGroupHooks> &pCallbacks) = 0; virtual void SetCallbacks(const AuSPtr<ICompletionGroupHooks> &pCallbacks) = 0;
virtual void AddWorkItem(AuSPtr<ICompletionGroupWorkItem> pCompletable) = 0; virtual void AddWorkItem(const AuSPtr<ICompletionGroupWorkItem> &pCompletable) = 0;
virtual AuSPtr<Loop::ILSEvent> GetTriggerLoopSource() = 0; virtual AuSPtr<Loop::ILSEvent> GetTriggerLoopSource() = 0;
virtual void TryTrigger() = 0; virtual void TryTrigger() = 0;

View File

@ -80,6 +80,10 @@ namespace Aurora::IO::FS
/** /**
* @brief Same as WriteFile except fails if the file already exists * @brief Same as WriteFile except fails if the file already exists
* Note the "New" series of functions ensures the data has been flushed via a blocking call.
* If you want further control over when flushing occurs, you should use the open file APIs.
* This is to ensure lazy POSIX-like applications can pass data between applications without
* getting blocked by a partially written file or locked file descriptor.
* @param path * @param path
* @param blob * @param blob
* @return * @return
@ -97,6 +101,10 @@ namespace Aurora::IO::FS
/** /**
* @brief Same as WriteString except fails if the file already exists * @brief Same as WriteString except fails if the file already exists
* Note the "New" series of functions ensures the data has been flushed via a blocking call.
* If you want further control over when flushing occurs, you should use the open file APIs.
* This is to ensure lazy POSIX-like applications can pass data between applications without
* getting blocked by a partially written file or locked file descriptor.
* @param path * @param path
* @param str * @param str
* @return * @return
@ -300,6 +308,7 @@ namespace Aurora::IO::FS
#include "IFileStream.hpp" #include "IFileStream.hpp"
#include "FileStream.hpp" #include "FileStream.hpp"
#include "FileSeekableReader.hpp" #include "FileSeekableReader.hpp"
#include "FileSeekableWriter.hpp"
#include "FileReader.hpp" #include "FileReader.hpp"
#include "FileWriter.hpp" #include "FileWriter.hpp"
#include "Resources.hpp" #include "Resources.hpp"

View File

@ -0,0 +1,73 @@
/***
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: FileSeekableWriter.hpp
Date: 2024-2-25
Author: Reece
***/
#pragma once
namespace Aurora::IO::FS
{
struct FileSeekableWriter : ISeekingWriter
{
AU_NO_COPY_NO_MOVE(FileSeekableWriter)
inline FileSeekableWriter() {}
inline FileSeekableWriter(const AuSPtr<IFileStream> &stream) : stream_(stream) { }
inline ~FileSeekableWriter() {}
template<typename... T>
bool OpenFile(T... args)
{
this->stream_ = OpenWriteShared(args...);
return bool(this->stream_);
}
virtual EStreamError IsOpen() override
{
return this->stream_ ? EStreamError::eErrorNone : EStreamError::eErrorStreamInterrupted;
}
virtual EStreamError ArbitraryWrite(AuUInt offset, const Memory::MemoryViewStreamRead &paramters) override
{
if (!this->stream_)
{
return EStreamError::eErrorStreamNotOpen;
}
if (!this->stream_->SetOffset(offset))
{
return EStreamError::eErrorEndOfStream;
}
if (!this->stream_->Write(paramters))
{
return EStreamError::eErrorStreamInterrupted;
}
if (paramters.outVariable == 0)
{
return EStreamError::eErrorEndOfStream;
}
return EStreamError::eErrorNone;
}
virtual void Flush() override
{
if (this->stream_)
{
this->stream_->Flush();
}
}
virtual void Close() override
{
this->stream_.reset();
}
private:
AuSPtr<IFileStream> stream_{};
};
}

View File

@ -35,12 +35,31 @@ namespace Aurora::IO
virtual bool StartWrite(AuUInt64 uOffset, const AuSPtr<Memory::MemoryViewRead> &memoryView) = 0; virtual bool StartWrite(AuUInt64 uOffset, const AuSPtr<Memory::MemoryViewRead> &memoryView) = 0;
/** /**
* @brief "Non-blocking" is-signaled and call callback poll routine (similar to nt alertable sleeps of a period zero) * @brief "Non-blocking" is-signaled and call any callback routines (similar to nt alertable sleeps of a period zero)
*/ */
virtual bool Complete() = 0; virtual bool Complete() = 0;
/**
* @brief Block for completion and call any callback routines (similar to nt alertable sleeps)
*/
virtual bool Wait(AuUInt32 uTimeout) = 0;
/** /**
* @brief Non-blocking has-failed (no callbacks, no alert sleep) * @brief Non-blocking has-failed (no callbacks, no alert sleep)
* @warning Also note that we generally try to suppress end of stream "errors"
* @warning If the underlying stream is empty or at the EoS/F point, expect a successful read of zero at least once.
* Further reads may fail.
*
* This design philosophy extends to networking whereby various socket shutdown conditions that most applications would
* consider to be an unsafe shutdown are suppressed as non-fatal. Not that you can't manually query the last error,
* it's just that we shouldn't be generating massive errors logs and presenting stupid warnings to the client, over so
* called "errors" that merely constitute somebodys mother vacuuming over an ethernet cable or a cell tower falling out of
* range. In my estimation, IO errors and other high level errors as presented to an application should only be that of
* logical errors, or that of us bailing out of an unsafe condition, or perhaps the kernel crying about IO resources. Some
* smart-ass networking protocol being aware of an incomplete socket shutdown isn't worth noting, unless the user has
* explicit wishes to log the exact reason of socket shutdown (ie: ISocket::GetError() under ISocketDriver::OnEnd)
*
* Likewise, this reasoning extends to IPC pipes; pipe breakage or other EoS condition should not result in an error.
* @return * @return
*/ */
virtual bool HasFailed() = 0; virtual bool HasFailed() = 0;
@ -68,14 +87,19 @@ namespace Aurora::IO
*/ */
virtual void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &pSubscriber) = 0; virtual void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &pSubscriber) = 0;
/**
* @brief Block for completion
*/
virtual bool Wait(AuUInt32 uTimeout) = 0;
/** /**
* @brief Provides a loop source that becomes signaled once the transaction is complete. * @brief Provides a loop source that becomes signaled once the transaction is complete.
* Polling the transaction may result in the execution of the callback. * Polling the transaction may result in the execution of the callback.
*
* @warning This function needs refactoring from "New..." to "To..."
* @warning This loop source may be that of the CompletionGroup::ICompletionGroup::ToAnyLoopSource(), if provided via TryAttachToCompletionGroup
* @warning Remember to never provide the same loop source twice to the sam LoopQueue or to sleep twice on the same loop source using AuAsync
*
* To prevent logicial errors, you should only be sleeping on this loop source within your scope rather than delegating it a ILoopQueue or AuAsync to prevent double attaches
* EG: pThat->NewLoopSource()->WaitOn(5 /\*ms*\/);
* This is just a recommendation. If you are yielding on only a few file streams and can guarantee you aren't registering the source twice on a single LoopQueue, it's fine.
*
* If you want to ensure proper batching with minimal IO resources, and safe AuAsync interop, you should probably just use CompletionGroups
*/ */
virtual AuSPtr<IO::Loop::ILoopSource> NewLoopSource() = 0; virtual AuSPtr<IO::Loop::ILoopSource> NewLoopSource() = 0;

View File

@ -12,7 +12,8 @@
#include "IStreamReader.hpp" #include "IStreamReader.hpp"
#include "IStreamWriter.hpp" #include "IStreamWriter.hpp"
#include "ISeekingReader.hpp" // arbitrary read stream, dunno what you would want to call thousands_sep #include "ISeekingReader.hpp"
#include "ISeekingWriter.hpp"
#include "Buffered/Buffered.hpp" #include "Buffered/Buffered.hpp"

View File

@ -54,6 +54,7 @@
#include "Adapters/IOAdapterRandom.hpp" #include "Adapters/IOAdapterRandom.hpp"
#include "Adapters/IOAdapterNOPs.hpp" #include "Adapters/IOAdapterNOPs.hpp"
#include "Adapters/IOAdapterZeros.hpp" #include "Adapters/IOAdapterZeros.hpp"
#include "Adapters/IOAdapterAsyncDelegators.hpp"
#include "IIOWaitableIOTimer.hpp" #include "IIOWaitableIOTimer.hpp"
#include "IIOWaitableIOLoopSource.hpp" #include "IIOWaitableIOLoopSource.hpp"
@ -62,4 +63,6 @@
#include "CompletionGroup/CompletionGroup.hpp" #include "CompletionGroup/CompletionGroup.hpp"
#include "IOWaitableIOCompletionGroup.hpp" #include "IOWaitableIOCompletionGroup.hpp"
#include "Async/IOAsync.hpp"

View File

@ -0,0 +1,19 @@
/***
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ISeekingWriter.hpp
Date: 2024-2-25
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
AUKN_INTERFACE(ISeekingWriter,
AUI_METHOD(EStreamError, IsOpen, ()),
AUI_METHOD(EStreamError, ArbitraryWrite, (AuUInt, uOffset,
const Memory::MemoryViewStreamRead&, writeView)),
AUI_METHOD(void, Flush, ()),
AUI_METHOD(void, Close, ())
);
}

View File

@ -84,7 +84,7 @@ namespace Aurora::IO::Loop
AUKN_SYM AuSPtr<ILSSemaphore> NewLSSemaphore(AuUInt32 uInitialCount = 0); AUKN_SYM AuSPtr<ILSSemaphore> NewLSSemaphore(AuUInt32 uInitialCount = 0);
AUKN_SYM AuSPtr<ILSSemaphore> NewLSSemaphoreSlow(AuUInt32 uInitialCount = 0); // interop-ready (usable with DbgLoopSourceToReadFd) AUKN_SYM AuSPtr<ILSSemaphore> NewLSSemaphoreSlow(AuUInt32 uInitialCount = 0); // interop-ready (usable with DbgLoopSourceToReadFd)
AUKN_SYM AuSPtr<ILoopSource> NewLSOSHandle(AuUInt); AUKN_SYM AuSPtr<ILoopSource> NewLSOSHandle(AuUInt);
AUKN_SYM AuSPtr<ILoopSource> NewLSAsync(Async::WorkerPId_t workerPid); AUKN_SYM AuSPtr<ILoopSource> NewLSAsync(Aurora::Async::WorkerPId_t workerPid);
AUKN_SYM AuSPtr<ILoopSource> NewLSFile(const AuSPtr<IO::IAsyncTransaction> &fileTransaction); AUKN_SYM AuSPtr<ILoopSource> NewLSFile(const AuSPtr<IO::IAsyncTransaction> &fileTransaction);
AUKN_SYM AuSPtr<ILoopSource> NewStdIn(); AUKN_SYM AuSPtr<ILoopSource> NewStdIn();
AUKN_SYM AuSPtr<ILoopSource> NewLSWin32Source(bool dispatchMessages); AUKN_SYM AuSPtr<ILoopSource> NewLSWin32Source(bool dispatchMessages);

View File

@ -24,4 +24,21 @@ namespace Aurora::IO::Net
AUI_METHOD(void, OnFinalize, ()) AUI_METHOD(void, OnFinalize, ())
); );
/*
* On ISocketDriver::OnEnd() versus ISocketDriver::OnFatalErrorReported() per IAsyncTransaction::HasFailed()
*
* [...] various socket shutdown conditions that most applications would
* consider to be an unsafe shutdown are suppressed as non-fatal. Not that you can't manually query the last error,
* it's just that we shouldn't be generating massive errors logs and presenting stupid warnings to the client, over so
* called "errors" that merely constitute somebodys mother vacuuming over an ethernet cable or a cell tower falling out of
* range. In my estimation, IO errors and other high level errors as presented to an application should only be that of
* logical errors, or that of us bailing out of an unsafe condition, or perhaps the kernel crying about IO resources. Some
* smart-ass networking protocol being aware of an incomplete socket shutdown isn't worth noting, unless the user has
* explicit wishes to log the exact reason of socket shutdown (ie: ISocket::GetError() under ISocketDriver::OnEnd())
*
* Therefore, OnFatalErrorReported() does not report some "critical" errors in transport, unless they're actually worth noting.
* We simply treat them as though the socket is shutting down gracefully for ease of logging.
* In either case, see `ISocket::GetError()` for the exact reason of shutdown.
*/
} }

View File

@ -599,16 +599,6 @@ namespace Aurora::Memory
} }
}; };
static ByteBuffer NewResizableBuffer(AuUInt32 length = 0)
{
return ByteBuffer(length, false, true);
}
static ByteBuffer NewRingBuffer(AuUInt32 length = 1024 * 5)
{
return ByteBuffer(length, true, false);
}
struct SharedByteBuffer : ByteBuffer struct SharedByteBuffer : ByteBuffer
{ {
AuSPtr<void> memory; AuSPtr<void> memory;
@ -681,4 +671,35 @@ namespace Aurora::Memory
return this->ToSharedReadView(); return this->ToSharedReadView();
} }
}; };
static ByteBuffer NewResizableBuffer(AuUInt32 length = 0)
{
return ByteBuffer(length, false, true);
}
static ByteBuffer NewRingBuffer(AuUInt32 length = 1024 * 5)
{
return ByteBuffer(length, true, false);
}
static AuSPtr<SharableByteBuffer> NewSharableResizableBuffer(AuUInt32 length = 0)
{
auto pThat = AuMakeShared<SharableByteBuffer>(length, false, true);
if (!(pThat && *pThat))
{
return {};
}
return pThat;
}
static AuSPtr<SharableByteBuffer> NewSharableBuffer(AuUInt32 length = 0)
{
auto pThat = AuMakeShared<SharableByteBuffer>(length);
if (!(pThat && *pThat))
{
return {};
}
return pThat;
}
} }

View File

@ -0,0 +1,273 @@
/***
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuIOAdapterAsyncDelegators.cpp
Date: 2024-2-24
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuIOAdapterAsyncDelegators.hpp"
#include <Source/IO/Async/AuIOThreadPool.hpp>
#include <Source/IO/Async/AuIOAPCLessWaitable.hpp>
namespace Aurora::IO::Adapters
{
struct AsyncReaderWriter : IAsyncTransaction,
Async::APCLessWaitable,
AuAsync::IWorkItemHandler
{
AuSPtr<IStreamReader> pStreamReader;
AuSPtr<ISeekingReader> pStreamReaderEx;
AuSPtr<IStreamWriter> pStreamWriter;
AuSPtr<ISeekingWriter> pStreamWriterEx;
AuSPtr<Memory::MemoryViewRead> pReadView;
AuSPtr<Memory::MemoryViewWrite> pWriteView;
AuWorkerPId workers;
void DispatchFrame(ProcessInfo &info) override
{
try
{
if (this->pReadView)
{
if (this->pStreamWriter)
{
this->eStreamError = this->pStreamWriter->Write(AuMemoryViewStreamRead(*this->pReadView, this->uLastLength));
}
else if (this->pStreamWriterEx)
{
this->eStreamError = this->pStreamWriterEx->ArbitraryWrite(this->uBaseOffset + this->uLastOffset, AuMemoryViewStreamRead(*this->pReadView, this->uLastLength));
}
}
else if (this->pWriteView)
{
if (this->pStreamReader)
{
this->eStreamError = this->pStreamReader->Read(AuMemoryViewStreamWrite(*this->pWriteView, this->uLastLength));
}
else if (this->pStreamReaderEx)
{
this->eStreamError = this->pStreamReaderEx->ArbitraryRead(this->uBaseOffset + this->uLastOffset, AuMemoryViewStreamWrite(*this->pWriteView, this->uLastLength));
}
}
}
catch (...)
{
this->eStreamError = EStreamError::eErrorGenericFault;
}
this->bInProgress = false;
this->SignalComplete();
}
void OnFailure()
{
this->bInProgress = false;
this->Reset();
}
bool StartRead(AuUInt64 uOffset, const AuSPtr<Memory::MemoryViewWrite> &memoryView) override
{
if (!memoryView)
{
SysPushErrorArg();
return false;
}
if (this->bInProgress)
{
return false;
}
if (!this->pStreamReaderEx && !this->pStreamReader)
{
return false;
}
AuResetMember(this->pReadView);
AuResetMember(this->pWriteView);
this->bInProgress = true;
this->uLastOffset = uOffset;
this->pWriteView = memoryView;
auto pThat = AuAsync::NewWorkItem(this->workers, AuStaticCast<AsyncReaderWriter>(this->SharedFromThis()));
if (!pThat)
{
return false;
}
return pThat->Dispatch();
}
bool StartWrite(AuUInt64 uOffset, const AuSPtr<Memory::MemoryViewRead> &memoryView) override
{
if (!memoryView)
{
SysPushErrorArg();
return false;
}
if (this->bInProgress)
{
return false;
}
if (!this->pStreamReaderEx && !this->pStreamReader)
{
return false;
}
AuResetMember(this->pReadView);
AuResetMember(this->pWriteView);
this->bInProgress = true;
this->uLastOffset = uOffset;
this->pReadView = memoryView;
auto pThat = AuAsync::NewWorkItem(this->workers, AuStaticCast<AsyncReaderWriter>(this->SharedFromThis()));
if (!pThat)
{
return false;
}
return pThat->Dispatch();
}
AuUInt32 GetLastPacketLength() override
{
return this->uLastLength;
}
void OnOriginThreadComplete() override
{
if (this->pSubscriber)
{
this->pSubscriber->OnAsyncFileOpFinished(this->uBaseOffset + this->uLastOffset, this->GetLastPacketLength());
}
AuResetMember(this->pReadView);
AuResetMember(this->pWriteView);
}
bool Complete() override
{
Async::APCLessWaitable::CheckLocal();
return Async::APCLessWaitable::HasBeenSignaled();
}
bool HasFailed() override
{
return Async::APCLessWaitable::HasBeenSignaled() && this->eStreamError != EStreamError::eErrorNone;
}
bool HasCompleted() override
{
return Async::APCLessWaitable::HasBeenSignaled() && this->eStreamError == EStreamError::eErrorNone;
}
AuUInt GetOSErrorCode() override
{
return this->HasFailed() ? AuUInt(this->eStreamError) : 0;
}
void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &pSubscriber) override
{
this->pSubscriber = pSubscriber;
}
bool Wait(AuUInt32 uTimeout) override
{
return NewLoopSource()->WaitOn(uTimeout);
}
AuSPtr<IO::Loop::ILoopSource> NewLoopSource() override
{
return Async::APCLessWaitable::GetLoopSource();
}
void Reset() override
{
if (this->bInProgress)
{
return;
}
AuResetMember(this->pReadView);
AuResetMember(this->pWriteView);
Async::APCLessWaitable::Reset();
}
void SetBaseOffset(AuUInt64 uBaseOffset) override
{
this->uBaseOffset = uBaseOffset;
}
bool TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup) override
{
return Async::APCLessWaitable::TryAttachToCompletionGroup(pCompletionGroup);
}
CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle() override
{
return this;
}
AuSPtr<CompletionGroup::ICompletionGroup> GetCompletionGroup() override
{
return Async::APCLessWaitable::GetCompletionGroup();
}
private:
AuMutex mutex;
AuSPtr<IAsyncFinishedSubscriber> pSubscriber;
AuUInt64 uBaseOffset {};
AuUInt64 uLastOffset {};
AuUInt uLastLength {};
bool bInProgress {};
EStreamError eStreamError = EStreamError::eErrorNone;
};
static AuWorkerPId GetAuxWorkerPool()
{
return Async::GetAuxWorkerPoolAndRegister();
}
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamReader(const AuSPtr<IStreamReader> &pStreamReader, AuOptional<AuWorkerPId> workers)
{
SysCheckArgNotNull(pStreamReader, {});
auto pObject = AuMakeShared<AsyncReaderWriter>();
SysCheckNotNullMemory(pObject, {});
pObject->pStreamReader = pStreamReader;
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
return pObject;
}
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamSeekingReader(const AuSPtr<ISeekingReader> &pStreamReader, AuOptional<AuWorkerPId> workers)
{
SysCheckArgNotNull(pStreamReader, {});
auto pObject = AuMakeShared<AsyncReaderWriter>();
SysCheckNotNullMemory(pObject, {});
pObject->pStreamReaderEx = pStreamReader;
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
return pObject;
}
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamWriter(const AuSPtr<IStreamWriter> &pStreamWriter, AuOptional<AuWorkerPId> workers)
{
SysCheckArgNotNull(pStreamWriter, {});
auto pObject = AuMakeShared<AsyncReaderWriter>();
SysCheckNotNullMemory(pObject, {});
pObject->pStreamWriter = pStreamWriter;
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
return pObject;
}
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamSeekingWriter(const AuSPtr<ISeekingWriter> &pStreamWriter, AuOptional<Aurora::Async::WorkerPId_t> workers)
{
SysCheckArgNotNull(pStreamWriter, {});
auto pObject = AuMakeShared<AsyncReaderWriter>();
SysCheckNotNullMemory(pObject, {});
pObject->pStreamWriterEx = pStreamWriter;
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
return pObject;
}
}

View File

@ -0,0 +1,12 @@
/***
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuIOAdapterAsyncDelegators.hpp
Date: 2024-2-24
Author: Reece
***/
#pragma once
namespace Aurora::IO::Adapters
{
}

View File

@ -0,0 +1,130 @@
/***
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuIOAPCLessWaitable.cpp
Date: 2024-2-25
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuIOAPCLessWaitable.hpp"
namespace Aurora::IO::Async
{
void APCLessEvent::OnFinishSleep()
{
if (auto pThat = AuTryLockMemoryType(this->wpParent))
{
pThat->CheckLocal();
}
}
APCLessWaitable::APCLessWaitable()
{
this->threadId = AuThreads::GetThreadId();
}
AuSPtr<Loop::ILoopSource> APCLessWaitable::GetLoopSource()
{
AU_LOCK_GUARD(this->mutex);
if (this->pCompletionGroup)
{
return this->pCompletionGroup->ToAnyLoopSource();
}
else if (this->pEvent)
{
return this->pEvent;
}
else
{
this->pEvent = AuMakeShared<APCLessEvent>();
SysCheckNotNullMemory(this->pEvent, {});
SysCheckReturn(this->pEvent->TryInit(false, false, true), {});
this->pEvent->wpParent = this->SharedFromThis();
return this->pEvent;
}
}
bool APCLessWaitable::HasCompletedForGCWI()
{
this->CheckLocal();
return this->bHasFinished;
}
void APCLessWaitable::CleanupForGCWI()
{
AuResetMember(this->pCompletionGroup);
}
bool APCLessWaitable::HasBeenSignaled()
{
return this->bHasFinished;
}
void APCLessWaitable::SignalComplete()
{
AU_LOCK_GUARD(this->mutex);
this->bHasFinished = true;
if (this->pEvent)
{
this->pEvent->Set();
}
if (this->pCompletionGroup)
{
if (auto pOtherEvent = this->pCompletionGroup->GetTriggerLoopSource())
{
pOtherEvent->Set();
}
}
}
void APCLessWaitable::Reset()
{
if (this->pEvent)
{
this->pEvent->Reset();
}
this->bHasFinished = false;
}
void APCLessWaitable::CheckLocal()
{
if (!this->bHasFinished)
{
return;
}
if (this->threadId == AuThreads::GetThreadId())
{
this->OnOriginThreadComplete();
}
}
bool APCLessWaitable::TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup)
{
AU_LOCK_GUARD(this->mutex);
if (!pCompletionGroup)
{
return false;
}
if (!this->pCompletionGroup)
{
return false;
}
this->pCompletionGroup = pCompletionGroup;
return true;
}
AuSPtr<CompletionGroup::ICompletionGroup> APCLessWaitable::GetCompletionGroup()
{
AU_LOCK_GUARD(this->mutex);
return this->pCompletionGroup;
}
}

View File

@ -0,0 +1,53 @@
/***
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuIOAPCLessWaitable.hpp
Date: 2024-2-25
Author: Reece
***/
#pragma once
#include <Source/IO/Loop/LSEvent.hpp>
namespace Aurora::IO::Async
{
struct APCLessWaitable;
struct APCLessEvent : Loop::LSEvent
{
AuWPtr<APCLessWaitable> wpParent;
void OnFinishSleep() override;
};
struct APCLessWaitable :
CompletionGroup::ICompletionGroupWorkItem,
AuEnableSharedFromThis<APCLessWaitable>
{
APCLessWaitable();
AuSPtr<Loop::ILoopSource> GetLoopSource();
void SignalComplete();
void CheckLocal();
bool TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup);
AuSPtr<CompletionGroup::ICompletionGroup> GetCompletionGroup();
void Reset();
virtual void OnOriginThreadComplete() = 0;
virtual bool HasCompletedForGCWI() override;
virtual void CleanupForGCWI() override;
bool HasBeenSignaled();
private:
AuUInt threadId {};
AuSPtr<CompletionGroup::ICompletionGroup> pCompletionGroup;
AuMutex mutex;
AuSPtr<APCLessEvent> pEvent;
bool bHasCalledBack {};
volatile bool bHasFinished {};
};
}

View File

@ -0,0 +1,110 @@
/***
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuIOThreadPool.cpp
Date: 2024-2-25
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuIOThreadPool.hpp"
namespace Aurora::IO::Async
{
static AuMutex gMutex;
static AuAsync::WorkerPId_t gDefaultGroup;
static AuSPtr<AuAsync::IThreadPool> gThreadPool;
static AuList<AuWPtr<AuAsync::IThreadPool>> gWorkersRegistered;
static AuUInt gJobRunners = 4;
AUKN_SYM void UseSpecifiedWorkerGroup(AuAsync::WorkerPId_t worker)
{
AU_LOCK_GUARD(gMutex);
gDefaultGroup = worker;
}
static AuAsync::WorkerPId_t SpawnDefaultGroup()
{
AuAsync::WorkerPId_t ret;
// TODO: gJobRunners = runtime config?
gThreadPool = AuAsync::NewThreadPool();
if (!gThreadPool)
{
return {};
}
ret = { gThreadPool, 0, AuAsync::kThreadIdAny };
for (AU_ITERATE_N(i, gJobRunners))
{
AuAsync::WorkerPId_t copy = ret;
copy.second = i;
if (!gThreadPool->Spawn(copy))
{
if (i > 1)
{
return ret;
}
else
{
return {};
}
}
}
return ret;
}
static AuAsync::WorkerPId_t GetAuxWorkerPool()
{
if (gDefaultGroup)
{
return gDefaultGroup;
}
else
{
return gDefaultGroup = SpawnDefaultGroup();
}
}
AuAsync::WorkerPId_t GetAuxWorkerPoolAndRegister()
{
AU_LOCK_GUARD(gMutex);
if (auto worker = GetAuxWorkerPool())
{
if (auto selfWorker = AuAsync::GetCurrentWorkerPId())
{
if (gWorkersRegistered.size() > 10)
{
AuRemoveAllIf(gWorkersRegistered, [](AuWPtr<AuAsync::IThreadPool> wpThat)
{
return wpThat.expired();
});
}
for (const auto &wpThat : gWorkersRegistered)
{
if (auto pThat = AuTryLockMemoryType(wpThat))
{
if (pThat == selfWorker.GetPool())
{
return worker;
}
}
}
gWorkersRegistered.push_back(selfWorker.GetPool());
selfWorker.GetPool()->AddDependency(worker.GetPool());
}
return worker;
}
else
{
SysPanic("No async threadpool");
}
}
}

View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuIOThreadPool.hpp
Date: 2024-2-25
Author: Reece
***/
#pragma once
namespace Aurora::IO::Async
{
AuAsync::WorkerPId_t GetAuxWorkerPoolAndRegister();
}

View File

@ -973,7 +973,7 @@ namespace Aurora::IO
return processor; return processor;
} }
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessorOnThread(bool tickOnly, Async::WorkerPId_t id) AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessorOnThread(bool tickOnly, AuAsync::WorkerPId_t id)
{ {
if (!id.GetPool()) if (!id.GetPool())
{ {

View File

@ -175,7 +175,7 @@ namespace Aurora::IO::CompletionGroup
} }
} }
void CompletionGroup::AddWorkItem(AuSPtr<ICompletionGroupWorkItem> pCompletable) void CompletionGroup::AddWorkItem(const AuSPtr<ICompletionGroupWorkItem> &pCompletable)
{ {
SysAssert(!this->bTerminated, "Completion group already terminated"); SysAssert(!this->bTerminated, "Completion group already terminated");
AU_LOCK_GUARD(this->mutex); AU_LOCK_GUARD(this->mutex);
@ -248,7 +248,7 @@ namespace Aurora::IO::CompletionGroup
this->DoIOTick(true); this->DoIOTick(true);
} }
AuSPtr<Async::IWorkItem> CompletionGroup::OnCompletion() AuSPtr<AuAsync::IWorkItem> CompletionGroup::OnCompletion()
{ {
AU_LOCK_GUARD(this->cs); AU_LOCK_GUARD(this->cs);
@ -299,7 +299,7 @@ namespace Aurora::IO::CompletionGroup
return pRet; return pRet;
} }
AuSPtr<Async::IWorkItem> CompletionGroup::OnSingleCompletion() AuSPtr<AuAsync::IWorkItem> CompletionGroup::OnSingleCompletion()
{ {
AU_LOCK_GUARD(this->cs); AU_LOCK_GUARD(this->cs);

View File

@ -21,8 +21,8 @@ namespace Aurora::IO::CompletionGroup
AuSPtr<Loop::ILoopSource> ToAndLoopSource() override; AuSPtr<Loop::ILoopSource> ToAndLoopSource() override;
AuSPtr<Loop::ILoopSource> ToAnyLoopSource() override; AuSPtr<Loop::ILoopSource> ToAnyLoopSource() override;
AuSPtr<Async::IWorkItem> OnCompletion() override; AuSPtr<AuAsync::IWorkItem> OnCompletion() override;
AuSPtr<Async::IWorkItem> OnSingleCompletion() override; AuSPtr<AuAsync::IWorkItem> OnSingleCompletion() override;
bool WaitForAnyMS(AuUInt32 uTimeoutOrZeroMS) override; bool WaitForAnyMS(AuUInt32 uTimeoutOrZeroMS) override;
bool WaitForAllMS(AuUInt32 uTimeoutOrZeroMS) override; bool WaitForAllMS(AuUInt32 uTimeoutOrZeroMS) override;
@ -39,7 +39,7 @@ namespace Aurora::IO::CompletionGroup
void ResetMemoryPins(); void ResetMemoryPins();
bool HasItemsActive(); bool HasItemsActive();
void AddWorkItem(AuSPtr<ICompletionGroupWorkItem> pCompletable) override; void AddWorkItem(const AuSPtr<ICompletionGroupWorkItem> &pCompletable) override;
void AddCallbackTick(const AuSPtr<IIOProcessorManualInvoker> &pCallbackInvoker, bool bAny); void AddCallbackTick(const AuSPtr<IIOProcessorManualInvoker> &pCallbackInvoker, bool bAny);
@ -53,8 +53,8 @@ namespace Aurora::IO::CompletionGroup
CompletionQuantumEventProvider andPlsDontAllocateFdIfUntouchedEvent; CompletionQuantumEventProvider andPlsDontAllocateFdIfUntouchedEvent;
AuList<AuSPtr<ICompletionGroupWorkItem>> workItems; AuList<AuSPtr<ICompletionGroupWorkItem>> workItems;
AuSPtr<ICompletionGroupHooks> pCallbacks; AuSPtr<ICompletionGroupHooks> pCallbacks;
AuSPtr<Async::IWorkItem> pAnyBarrier; AuSPtr<AuAsync::IWorkItem> pAnyBarrier;
AuSPtr<Async::IWorkItem> pAndBarrier; AuSPtr<AuAsync::IWorkItem> pAndBarrier;
AuList<AuPair<AuSPtr<IIOProcessorManualInvoker>, bool>> callbackTicks; AuList<AuPair<AuSPtr<IIOProcessorManualInvoker>, bool>> callbackTicks;
AuUInt32 uAdded {}; AuUInt32 uAdded {};
AuUInt32 uTriggered {}; AuUInt32 uTriggered {};

View File

@ -12,16 +12,16 @@
namespace Aurora::IO::CompletionGroup namespace Aurora::IO::CompletionGroup
{ {
CompletionGroupAndedIOWorkItem::CompletionGroupAndedIOWorkItem(Async::IThreadPoolInternal *owner, CompletionGroupAndedIOWorkItem::CompletionGroupAndedIOWorkItem(AuAsync::IThreadPoolInternal *owner,
const AuWorkerID &worker, const AuWorkerID &worker,
AuSPtr<CompletionGroup> pParent) : AuSPtr<CompletionGroup> pParent) :
Async::WorkItem(owner, worker, nullptr), AuAsync::WorkItem(owner, worker, nullptr),
pParent(pParent) pParent(pParent)
{ {
} }
void CompletionGroupAndedIOWorkItem::DispatchTask(Async::IWorkItemHandler::ProcessInfo &info) void CompletionGroupAndedIOWorkItem::DispatchTask(AuAsync::IWorkItemHandler::ProcessInfo &info)
{ {
this->pParent->DoIOTick(false); this->pParent->DoIOTick(false);

View File

@ -11,13 +11,13 @@
namespace Aurora::IO::CompletionGroup namespace Aurora::IO::CompletionGroup
{ {
struct CompletionGroupAndedIOWorkItem : Async::WorkItem struct CompletionGroupAndedIOWorkItem : AuAsync::WorkItem
{ {
CompletionGroupAndedIOWorkItem(Async::IThreadPoolInternal *owner, CompletionGroupAndedIOWorkItem(AuAsync::IThreadPoolInternal *owner,
const AuWorkerID &worker, const AuWorkerID &worker,
AuSPtr<CompletionGroup> pParent); AuSPtr<CompletionGroup> pParent);
void DispatchTask(Async::IWorkItemHandler::ProcessInfo &info) override; void DispatchTask(AuAsync::IWorkItemHandler::ProcessInfo &info) override;
AuSPtr<CompletionGroup> pParent; AuSPtr<CompletionGroup> pParent;
}; };

View File

@ -42,11 +42,11 @@ namespace Aurora::IO::Loop
return AuConsole::StdInBufferLoopSource(); return AuConsole::StdInBufferLoopSource();
} }
AUKN_SYM AuSPtr<ILoopSource> NewLSAsync(Async::WorkerPId_t workerPid) AUKN_SYM AuSPtr<ILoopSource> NewLSAsync(AuAsync::WorkerPId_t workerPid)
{ {
if (!workerPid) if (!workerPid)
{ {
return Async::GetAsyncApp()->WorkerToLoopSource(workerPid); return AuAsync::GetAsyncApp()->WorkerToLoopSource(workerPid);
} }
return workerPid.GetPool()->WorkerToLoopSource(workerPid); return workerPid.GetPool()->WorkerToLoopSource(workerPid);