[+] IO Thread Pool
[+] FileSeekingWriter [+] ISeekingWriter [+] AuIO::Adapters::NewAsyncTransactionFromStreamReader [+] AuIO::Adapters::NewAsyncTransactionFromStreamSeekingReader [+] AuIO::Adapters::NewAsyncTransactionFromStreamWriter [+] AuIO::Adapters::NewAsyncTransactionFromStreamSeekingWriter [+] AuIO::Async::UseSpecifiedWorkerGroup [+] AuMemory::NewSharableResizableBuffer [+] AuMemory::NewSharableBuffer [*] Update comments
This commit is contained in:
parent
7dbf564a27
commit
1920f5a8d5
@ -123,7 +123,7 @@ namespace Aurora::Async
|
|||||||
cstatic WorkerPId_t FromLocalAnyInGroup(ThreadGroup_t group)
|
cstatic WorkerPId_t FromLocalAnyInGroup(ThreadGroup_t group)
|
||||||
{
|
{
|
||||||
auto worker = GetCurrentWorkerPId();
|
auto worker = GetCurrentWorkerPId();
|
||||||
SysAssertDbg(worker, "No attached to a thread pool");
|
SysAssertDbg(worker, "Not attached to a thread pool");
|
||||||
return FromJobRunner(worker.GetPool(), group);
|
return FromJobRunner(worker.GetPool(), group);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -131,14 +131,14 @@ namespace Aurora::Async
|
|||||||
ThreadId_t id)
|
ThreadId_t id)
|
||||||
{
|
{
|
||||||
auto worker = GetCurrentWorkerPId();
|
auto worker = GetCurrentWorkerPId();
|
||||||
SysAssertDbg(worker, "No attached to a thread pool");
|
SysAssertDbg(worker, "Not attached to a thread pool");
|
||||||
return FromJobRunner(worker.GetPool(), group, id);
|
return FromJobRunner(worker.GetPool(), group, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
cstatic WorkerPId_t FromLocalJobRunner(WorkerId_t id)
|
cstatic WorkerPId_t FromLocalJobRunner(WorkerId_t id)
|
||||||
{
|
{
|
||||||
auto worker = GetCurrentWorkerPId();
|
auto worker = GetCurrentWorkerPId();
|
||||||
SysAssertDbg(worker, "No attached to a thread pool");
|
SysAssertDbg(worker, "Not attached to a thread pool");
|
||||||
return FromJobRunner(worker.GetPool(), id);
|
return FromJobRunner(worker.GetPool(), id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -146,14 +146,14 @@ namespace Aurora::Async
|
|||||||
ThreadId_t id)
|
ThreadId_t id)
|
||||||
{
|
{
|
||||||
auto worker = GetCurrentWorkerPId();
|
auto worker = GetCurrentWorkerPId();
|
||||||
SysAssertDbg(worker, "No attached to a thread pool");
|
SysAssertDbg(worker, "Not attached to a thread pool");
|
||||||
return FromJobRunner(worker.GetPool(), group, id);
|
return FromJobRunner(worker.GetPool(), group, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
cstatic WorkerPId_t FromLocalThreadID(ThreadId_t id)
|
cstatic WorkerPId_t FromLocalThreadID(ThreadId_t id)
|
||||||
{
|
{
|
||||||
auto worker = GetCurrentWorkerPId();
|
auto worker = GetCurrentWorkerPId();
|
||||||
SysAssertDbg(worker, "No in thread");
|
SysAssertDbg(worker, "Not in thread");
|
||||||
return FromJobRunner(worker.GetPool(), worker.GetGroup(), id);
|
return FromJobRunner(worker.GetPool(), worker.GetGroup(), id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
16
Include/Aurora/IO/Adapters/IOAdapterAsyncDelegators.hpp
Normal file
16
Include/Aurora/IO/Adapters/IOAdapterAsyncDelegators.hpp
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: IOAdapterAsyncDelegators.hpp
|
||||||
|
Date: 2024-2-24
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace Aurora::IO::Adapters
|
||||||
|
{
|
||||||
|
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamReader(const AuSPtr<IStreamReader> &pStreamReader, AuOptional<Aurora::Async::WorkerPId_t> workers);
|
||||||
|
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamSeekingReader(const AuSPtr<ISeekingReader> &pStreamReader, AuOptional<Aurora::Async::WorkerPId_t> workers);
|
||||||
|
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamWriter(const AuSPtr<IStreamWriter> &pStreamWriter, AuOptional<Aurora::Async::WorkerPId_t> workers);
|
||||||
|
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamSeekingWriter(const AuSPtr<ISeekingWriter> &pStreamWriter, AuOptional<Aurora::Async::WorkerPId_t> workers);
|
||||||
|
}
|
13
Include/Aurora/IO/Async/IOAsync.hpp
Normal file
13
Include/Aurora/IO/Async/IOAsync.hpp
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: IOAsync.hpp
|
||||||
|
Date: 2024-2-25
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace Aurora::IO::Async
|
||||||
|
{
|
||||||
|
AUKN_SYM void UseSpecifiedWorkerGroup(Aurora::Async::WorkerPId_t worker);
|
||||||
|
}
|
@ -31,7 +31,7 @@ namespace Aurora::IO::CompletionGroup
|
|||||||
virtual AuPair<AuUInt32, AuUInt32> GetStats() = 0;
|
virtual AuPair<AuUInt32, AuUInt32> GetStats() = 0;
|
||||||
virtual void SetCallbacks(const AuSPtr<ICompletionGroupHooks> &pCallbacks) = 0;
|
virtual void SetCallbacks(const AuSPtr<ICompletionGroupHooks> &pCallbacks) = 0;
|
||||||
|
|
||||||
virtual void AddWorkItem(AuSPtr<ICompletionGroupWorkItem> pCompletable) = 0;
|
virtual void AddWorkItem(const AuSPtr<ICompletionGroupWorkItem> &pCompletable) = 0;
|
||||||
|
|
||||||
virtual AuSPtr<Loop::ILSEvent> GetTriggerLoopSource() = 0;
|
virtual AuSPtr<Loop::ILSEvent> GetTriggerLoopSource() = 0;
|
||||||
virtual void TryTrigger() = 0;
|
virtual void TryTrigger() = 0;
|
||||||
|
@ -80,6 +80,10 @@ namespace Aurora::IO::FS
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Same as WriteFile except fails if the file already exists
|
* @brief Same as WriteFile except fails if the file already exists
|
||||||
|
* Note the "New" series of functions ensures the data has been flushed via a blocking call.
|
||||||
|
* If you want further control over when flushing occurs, you should use the open file APIs.
|
||||||
|
* This is to ensure lazy POSIX-like applications can pass data between applications without
|
||||||
|
* getting blocked by a partially written file or locked file descriptor.
|
||||||
* @param path
|
* @param path
|
||||||
* @param blob
|
* @param blob
|
||||||
* @return
|
* @return
|
||||||
@ -97,6 +101,10 @@ namespace Aurora::IO::FS
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Same as WriteString except fails if the file already exists
|
* @brief Same as WriteString except fails if the file already exists
|
||||||
|
* Note the "New" series of functions ensures the data has been flushed via a blocking call.
|
||||||
|
* If you want further control over when flushing occurs, you should use the open file APIs.
|
||||||
|
* This is to ensure lazy POSIX-like applications can pass data between applications without
|
||||||
|
* getting blocked by a partially written file or locked file descriptor.
|
||||||
* @param path
|
* @param path
|
||||||
* @param str
|
* @param str
|
||||||
* @return
|
* @return
|
||||||
@ -300,6 +308,7 @@ namespace Aurora::IO::FS
|
|||||||
#include "IFileStream.hpp"
|
#include "IFileStream.hpp"
|
||||||
#include "FileStream.hpp"
|
#include "FileStream.hpp"
|
||||||
#include "FileSeekableReader.hpp"
|
#include "FileSeekableReader.hpp"
|
||||||
|
#include "FileSeekableWriter.hpp"
|
||||||
#include "FileReader.hpp"
|
#include "FileReader.hpp"
|
||||||
#include "FileWriter.hpp"
|
#include "FileWriter.hpp"
|
||||||
#include "Resources.hpp"
|
#include "Resources.hpp"
|
||||||
|
73
Include/Aurora/IO/FS/FileSeekableWriter.hpp
Normal file
73
Include/Aurora/IO/FS/FileSeekableWriter.hpp
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: FileSeekableWriter.hpp
|
||||||
|
Date: 2024-2-25
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace Aurora::IO::FS
|
||||||
|
{
|
||||||
|
struct FileSeekableWriter : ISeekingWriter
|
||||||
|
{
|
||||||
|
AU_NO_COPY_NO_MOVE(FileSeekableWriter)
|
||||||
|
|
||||||
|
inline FileSeekableWriter() {}
|
||||||
|
inline FileSeekableWriter(const AuSPtr<IFileStream> &stream) : stream_(stream) { }
|
||||||
|
inline ~FileSeekableWriter() {}
|
||||||
|
|
||||||
|
template<typename... T>
|
||||||
|
bool OpenFile(T... args)
|
||||||
|
{
|
||||||
|
this->stream_ = OpenWriteShared(args...);
|
||||||
|
return bool(this->stream_);
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual EStreamError IsOpen() override
|
||||||
|
{
|
||||||
|
return this->stream_ ? EStreamError::eErrorNone : EStreamError::eErrorStreamInterrupted;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual EStreamError ArbitraryWrite(AuUInt offset, const Memory::MemoryViewStreamRead ¶mters) override
|
||||||
|
{
|
||||||
|
if (!this->stream_)
|
||||||
|
{
|
||||||
|
return EStreamError::eErrorStreamNotOpen;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this->stream_->SetOffset(offset))
|
||||||
|
{
|
||||||
|
return EStreamError::eErrorEndOfStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this->stream_->Write(paramters))
|
||||||
|
{
|
||||||
|
return EStreamError::eErrorStreamInterrupted;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (paramters.outVariable == 0)
|
||||||
|
{
|
||||||
|
return EStreamError::eErrorEndOfStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
return EStreamError::eErrorNone;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual void Flush() override
|
||||||
|
{
|
||||||
|
if (this->stream_)
|
||||||
|
{
|
||||||
|
this->stream_->Flush();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual void Close() override
|
||||||
|
{
|
||||||
|
this->stream_.reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
AuSPtr<IFileStream> stream_{};
|
||||||
|
};
|
||||||
|
}
|
@ -35,12 +35,31 @@ namespace Aurora::IO
|
|||||||
virtual bool StartWrite(AuUInt64 uOffset, const AuSPtr<Memory::MemoryViewRead> &memoryView) = 0;
|
virtual bool StartWrite(AuUInt64 uOffset, const AuSPtr<Memory::MemoryViewRead> &memoryView) = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief "Non-blocking" is-signaled and call callback poll routine (similar to nt alertable sleeps of a period zero)
|
* @brief "Non-blocking" is-signaled and call any callback routines (similar to nt alertable sleeps of a period zero)
|
||||||
*/
|
*/
|
||||||
virtual bool Complete() = 0;
|
virtual bool Complete() = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Block for completion and call any callback routines (similar to nt alertable sleeps)
|
||||||
|
*/
|
||||||
|
virtual bool Wait(AuUInt32 uTimeout) = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Non-blocking has-failed (no callbacks, no alert sleep)
|
* @brief Non-blocking has-failed (no callbacks, no alert sleep)
|
||||||
|
* @warning Also note that we generally try to suppress end of stream "errors"
|
||||||
|
* @warning If the underlying stream is empty or at the EoS/F point, expect a successful read of zero at least once.
|
||||||
|
* Further reads may fail.
|
||||||
|
*
|
||||||
|
* This design philosophy extends to networking whereby various socket shutdown conditions that most applications would
|
||||||
|
* consider to be an unsafe shutdown are suppressed as non-fatal. Not that you can't manually query the last error,
|
||||||
|
* it's just that we shouldn't be generating massive errors logs and presenting stupid warnings to the client, over so
|
||||||
|
* called "errors" that merely constitute somebodys mother vacuuming over an ethernet cable or a cell tower falling out of
|
||||||
|
* range. In my estimation, IO errors and other high level errors as presented to an application should only be that of
|
||||||
|
* logical errors, or that of us bailing out of an unsafe condition, or perhaps the kernel crying about IO resources. Some
|
||||||
|
* smart-ass networking protocol being aware of an incomplete socket shutdown isn't worth noting, unless the user has
|
||||||
|
* explicit wishes to log the exact reason of socket shutdown (ie: ISocket::GetError() under ISocketDriver::OnEnd)
|
||||||
|
*
|
||||||
|
* Likewise, this reasoning extends to IPC pipes; pipe breakage or other EoS condition should not result in an error.
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
virtual bool HasFailed() = 0;
|
virtual bool HasFailed() = 0;
|
||||||
@ -68,14 +87,19 @@ namespace Aurora::IO
|
|||||||
*/
|
*/
|
||||||
virtual void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &pSubscriber) = 0;
|
virtual void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &pSubscriber) = 0;
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Block for completion
|
|
||||||
*/
|
|
||||||
virtual bool Wait(AuUInt32 uTimeout) = 0;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Provides a loop source that becomes signaled once the transaction is complete.
|
* @brief Provides a loop source that becomes signaled once the transaction is complete.
|
||||||
* Polling the transaction may result in the execution of the callback.
|
* Polling the transaction may result in the execution of the callback.
|
||||||
|
*
|
||||||
|
* @warning This function needs refactoring from "New..." to "To..."
|
||||||
|
* @warning This loop source may be that of the CompletionGroup::ICompletionGroup::ToAnyLoopSource(), if provided via TryAttachToCompletionGroup
|
||||||
|
* @warning Remember to never provide the same loop source twice to the sam LoopQueue or to sleep twice on the same loop source using AuAsync
|
||||||
|
*
|
||||||
|
* To prevent logicial errors, you should only be sleeping on this loop source within your scope rather than delegating it a ILoopQueue or AuAsync to prevent double attaches
|
||||||
|
* EG: pThat->NewLoopSource()->WaitOn(5 /\*ms*\/);
|
||||||
|
* This is just a recommendation. If you are yielding on only a few file streams and can guarantee you aren't registering the source twice on a single LoopQueue, it's fine.
|
||||||
|
*
|
||||||
|
* If you want to ensure proper batching with minimal IO resources, and safe AuAsync interop, you should probably just use CompletionGroups
|
||||||
*/
|
*/
|
||||||
virtual AuSPtr<IO::Loop::ILoopSource> NewLoopSource() = 0;
|
virtual AuSPtr<IO::Loop::ILoopSource> NewLoopSource() = 0;
|
||||||
|
|
||||||
|
@ -12,7 +12,8 @@
|
|||||||
|
|
||||||
#include "IStreamReader.hpp"
|
#include "IStreamReader.hpp"
|
||||||
#include "IStreamWriter.hpp"
|
#include "IStreamWriter.hpp"
|
||||||
#include "ISeekingReader.hpp" // arbitrary read stream, dunno what you would want to call thousands_sep
|
#include "ISeekingReader.hpp"
|
||||||
|
#include "ISeekingWriter.hpp"
|
||||||
|
|
||||||
#include "Buffered/Buffered.hpp"
|
#include "Buffered/Buffered.hpp"
|
||||||
|
|
||||||
|
@ -54,6 +54,7 @@
|
|||||||
#include "Adapters/IOAdapterRandom.hpp"
|
#include "Adapters/IOAdapterRandom.hpp"
|
||||||
#include "Adapters/IOAdapterNOPs.hpp"
|
#include "Adapters/IOAdapterNOPs.hpp"
|
||||||
#include "Adapters/IOAdapterZeros.hpp"
|
#include "Adapters/IOAdapterZeros.hpp"
|
||||||
|
#include "Adapters/IOAdapterAsyncDelegators.hpp"
|
||||||
|
|
||||||
#include "IIOWaitableIOTimer.hpp"
|
#include "IIOWaitableIOTimer.hpp"
|
||||||
#include "IIOWaitableIOLoopSource.hpp"
|
#include "IIOWaitableIOLoopSource.hpp"
|
||||||
@ -62,4 +63,6 @@
|
|||||||
|
|
||||||
#include "CompletionGroup/CompletionGroup.hpp"
|
#include "CompletionGroup/CompletionGroup.hpp"
|
||||||
|
|
||||||
#include "IOWaitableIOCompletionGroup.hpp"
|
#include "IOWaitableIOCompletionGroup.hpp"
|
||||||
|
|
||||||
|
#include "Async/IOAsync.hpp"
|
19
Include/Aurora/IO/ISeekingWriter.hpp
Normal file
19
Include/Aurora/IO/ISeekingWriter.hpp
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: ISeekingWriter.hpp
|
||||||
|
Date: 2024-2-25
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace Aurora::IO
|
||||||
|
{
|
||||||
|
AUKN_INTERFACE(ISeekingWriter,
|
||||||
|
AUI_METHOD(EStreamError, IsOpen, ()),
|
||||||
|
AUI_METHOD(EStreamError, ArbitraryWrite, (AuUInt, uOffset,
|
||||||
|
const Memory::MemoryViewStreamRead&, writeView)),
|
||||||
|
AUI_METHOD(void, Flush, ()),
|
||||||
|
AUI_METHOD(void, Close, ())
|
||||||
|
);
|
||||||
|
}
|
@ -84,7 +84,7 @@ namespace Aurora::IO::Loop
|
|||||||
AUKN_SYM AuSPtr<ILSSemaphore> NewLSSemaphore(AuUInt32 uInitialCount = 0);
|
AUKN_SYM AuSPtr<ILSSemaphore> NewLSSemaphore(AuUInt32 uInitialCount = 0);
|
||||||
AUKN_SYM AuSPtr<ILSSemaphore> NewLSSemaphoreSlow(AuUInt32 uInitialCount = 0); // interop-ready (usable with DbgLoopSourceToReadFd)
|
AUKN_SYM AuSPtr<ILSSemaphore> NewLSSemaphoreSlow(AuUInt32 uInitialCount = 0); // interop-ready (usable with DbgLoopSourceToReadFd)
|
||||||
AUKN_SYM AuSPtr<ILoopSource> NewLSOSHandle(AuUInt);
|
AUKN_SYM AuSPtr<ILoopSource> NewLSOSHandle(AuUInt);
|
||||||
AUKN_SYM AuSPtr<ILoopSource> NewLSAsync(Async::WorkerPId_t workerPid);
|
AUKN_SYM AuSPtr<ILoopSource> NewLSAsync(Aurora::Async::WorkerPId_t workerPid);
|
||||||
AUKN_SYM AuSPtr<ILoopSource> NewLSFile(const AuSPtr<IO::IAsyncTransaction> &fileTransaction);
|
AUKN_SYM AuSPtr<ILoopSource> NewLSFile(const AuSPtr<IO::IAsyncTransaction> &fileTransaction);
|
||||||
AUKN_SYM AuSPtr<ILoopSource> NewStdIn();
|
AUKN_SYM AuSPtr<ILoopSource> NewStdIn();
|
||||||
AUKN_SYM AuSPtr<ILoopSource> NewLSWin32Source(bool dispatchMessages);
|
AUKN_SYM AuSPtr<ILoopSource> NewLSWin32Source(bool dispatchMessages);
|
||||||
|
@ -24,4 +24,21 @@ namespace Aurora::IO::Net
|
|||||||
|
|
||||||
AUI_METHOD(void, OnFinalize, ())
|
AUI_METHOD(void, OnFinalize, ())
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* On ISocketDriver::OnEnd() versus ISocketDriver::OnFatalErrorReported() per IAsyncTransaction::HasFailed()
|
||||||
|
*
|
||||||
|
* [...] various socket shutdown conditions that most applications would
|
||||||
|
* consider to be an unsafe shutdown are suppressed as non-fatal. Not that you can't manually query the last error,
|
||||||
|
* it's just that we shouldn't be generating massive errors logs and presenting stupid warnings to the client, over so
|
||||||
|
* called "errors" that merely constitute somebodys mother vacuuming over an ethernet cable or a cell tower falling out of
|
||||||
|
* range. In my estimation, IO errors and other high level errors as presented to an application should only be that of
|
||||||
|
* logical errors, or that of us bailing out of an unsafe condition, or perhaps the kernel crying about IO resources. Some
|
||||||
|
* smart-ass networking protocol being aware of an incomplete socket shutdown isn't worth noting, unless the user has
|
||||||
|
* explicit wishes to log the exact reason of socket shutdown (ie: ISocket::GetError() under ISocketDriver::OnEnd())
|
||||||
|
*
|
||||||
|
* Therefore, OnFatalErrorReported() does not report some "critical" errors in transport, unless they're actually worth noting.
|
||||||
|
* We simply treat them as though the socket is shutting down gracefully for ease of logging.
|
||||||
|
* In either case, see `ISocket::GetError()` for the exact reason of shutdown.
|
||||||
|
*/
|
||||||
}
|
}
|
@ -599,16 +599,6 @@ namespace Aurora::Memory
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
static ByteBuffer NewResizableBuffer(AuUInt32 length = 0)
|
|
||||||
{
|
|
||||||
return ByteBuffer(length, false, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
static ByteBuffer NewRingBuffer(AuUInt32 length = 1024 * 5)
|
|
||||||
{
|
|
||||||
return ByteBuffer(length, true, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
struct SharedByteBuffer : ByteBuffer
|
struct SharedByteBuffer : ByteBuffer
|
||||||
{
|
{
|
||||||
AuSPtr<void> memory;
|
AuSPtr<void> memory;
|
||||||
@ -681,4 +671,35 @@ namespace Aurora::Memory
|
|||||||
return this->ToSharedReadView();
|
return this->ToSharedReadView();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
static ByteBuffer NewResizableBuffer(AuUInt32 length = 0)
|
||||||
|
{
|
||||||
|
return ByteBuffer(length, false, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
static ByteBuffer NewRingBuffer(AuUInt32 length = 1024 * 5)
|
||||||
|
{
|
||||||
|
return ByteBuffer(length, true, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
static AuSPtr<SharableByteBuffer> NewSharableResizableBuffer(AuUInt32 length = 0)
|
||||||
|
{
|
||||||
|
auto pThat = AuMakeShared<SharableByteBuffer>(length, false, true);
|
||||||
|
if (!(pThat && *pThat))
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
return pThat;
|
||||||
|
}
|
||||||
|
|
||||||
|
static AuSPtr<SharableByteBuffer> NewSharableBuffer(AuUInt32 length = 0)
|
||||||
|
{
|
||||||
|
auto pThat = AuMakeShared<SharableByteBuffer>(length);
|
||||||
|
if (!(pThat && *pThat))
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
return pThat;
|
||||||
|
}
|
||||||
}
|
}
|
273
Source/IO/Adapters/AuIOAdapterAsyncDelegators.cpp
Normal file
273
Source/IO/Adapters/AuIOAdapterAsyncDelegators.cpp
Normal file
@ -0,0 +1,273 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: AuIOAdapterAsyncDelegators.cpp
|
||||||
|
Date: 2024-2-24
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#include <Source/RuntimeInternal.hpp>
|
||||||
|
#include "AuIOAdapterAsyncDelegators.hpp"
|
||||||
|
#include <Source/IO/Async/AuIOThreadPool.hpp>
|
||||||
|
#include <Source/IO/Async/AuIOAPCLessWaitable.hpp>
|
||||||
|
|
||||||
|
namespace Aurora::IO::Adapters
|
||||||
|
{
|
||||||
|
struct AsyncReaderWriter : IAsyncTransaction,
|
||||||
|
Async::APCLessWaitable,
|
||||||
|
AuAsync::IWorkItemHandler
|
||||||
|
{
|
||||||
|
AuSPtr<IStreamReader> pStreamReader;
|
||||||
|
AuSPtr<ISeekingReader> pStreamReaderEx;
|
||||||
|
AuSPtr<IStreamWriter> pStreamWriter;
|
||||||
|
AuSPtr<ISeekingWriter> pStreamWriterEx;
|
||||||
|
AuSPtr<Memory::MemoryViewRead> pReadView;
|
||||||
|
AuSPtr<Memory::MemoryViewWrite> pWriteView;
|
||||||
|
AuWorkerPId workers;
|
||||||
|
|
||||||
|
void DispatchFrame(ProcessInfo &info) override
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (this->pReadView)
|
||||||
|
{
|
||||||
|
if (this->pStreamWriter)
|
||||||
|
{
|
||||||
|
this->eStreamError = this->pStreamWriter->Write(AuMemoryViewStreamRead(*this->pReadView, this->uLastLength));
|
||||||
|
}
|
||||||
|
else if (this->pStreamWriterEx)
|
||||||
|
{
|
||||||
|
this->eStreamError = this->pStreamWriterEx->ArbitraryWrite(this->uBaseOffset + this->uLastOffset, AuMemoryViewStreamRead(*this->pReadView, this->uLastLength));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (this->pWriteView)
|
||||||
|
{
|
||||||
|
if (this->pStreamReader)
|
||||||
|
{
|
||||||
|
this->eStreamError = this->pStreamReader->Read(AuMemoryViewStreamWrite(*this->pWriteView, this->uLastLength));
|
||||||
|
}
|
||||||
|
else if (this->pStreamReaderEx)
|
||||||
|
{
|
||||||
|
this->eStreamError = this->pStreamReaderEx->ArbitraryRead(this->uBaseOffset + this->uLastOffset, AuMemoryViewStreamWrite(*this->pWriteView, this->uLastLength));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
this->eStreamError = EStreamError::eErrorGenericFault;
|
||||||
|
}
|
||||||
|
|
||||||
|
this->bInProgress = false;
|
||||||
|
this->SignalComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
void OnFailure()
|
||||||
|
{
|
||||||
|
this->bInProgress = false;
|
||||||
|
this->Reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool StartRead(AuUInt64 uOffset, const AuSPtr<Memory::MemoryViewWrite> &memoryView) override
|
||||||
|
{
|
||||||
|
if (!memoryView)
|
||||||
|
{
|
||||||
|
SysPushErrorArg();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this->bInProgress)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this->pStreamReaderEx && !this->pStreamReader)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
AuResetMember(this->pReadView);
|
||||||
|
AuResetMember(this->pWriteView);
|
||||||
|
this->bInProgress = true;
|
||||||
|
this->uLastOffset = uOffset;
|
||||||
|
this->pWriteView = memoryView;
|
||||||
|
|
||||||
|
auto pThat = AuAsync::NewWorkItem(this->workers, AuStaticCast<AsyncReaderWriter>(this->SharedFromThis()));
|
||||||
|
if (!pThat)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pThat->Dispatch();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool StartWrite(AuUInt64 uOffset, const AuSPtr<Memory::MemoryViewRead> &memoryView) override
|
||||||
|
{
|
||||||
|
if (!memoryView)
|
||||||
|
{
|
||||||
|
SysPushErrorArg();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this->bInProgress)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this->pStreamReaderEx && !this->pStreamReader)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
AuResetMember(this->pReadView);
|
||||||
|
AuResetMember(this->pWriteView);
|
||||||
|
this->bInProgress = true;
|
||||||
|
this->uLastOffset = uOffset;
|
||||||
|
this->pReadView = memoryView;
|
||||||
|
|
||||||
|
auto pThat = AuAsync::NewWorkItem(this->workers, AuStaticCast<AsyncReaderWriter>(this->SharedFromThis()));
|
||||||
|
if (!pThat)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pThat->Dispatch();
|
||||||
|
}
|
||||||
|
|
||||||
|
AuUInt32 GetLastPacketLength() override
|
||||||
|
{
|
||||||
|
return this->uLastLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
void OnOriginThreadComplete() override
|
||||||
|
{
|
||||||
|
if (this->pSubscriber)
|
||||||
|
{
|
||||||
|
this->pSubscriber->OnAsyncFileOpFinished(this->uBaseOffset + this->uLastOffset, this->GetLastPacketLength());
|
||||||
|
}
|
||||||
|
|
||||||
|
AuResetMember(this->pReadView);
|
||||||
|
AuResetMember(this->pWriteView);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Complete() override
|
||||||
|
{
|
||||||
|
Async::APCLessWaitable::CheckLocal();
|
||||||
|
return Async::APCLessWaitable::HasBeenSignaled();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool HasFailed() override
|
||||||
|
{
|
||||||
|
return Async::APCLessWaitable::HasBeenSignaled() && this->eStreamError != EStreamError::eErrorNone;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool HasCompleted() override
|
||||||
|
{
|
||||||
|
return Async::APCLessWaitable::HasBeenSignaled() && this->eStreamError == EStreamError::eErrorNone;
|
||||||
|
}
|
||||||
|
|
||||||
|
AuUInt GetOSErrorCode() override
|
||||||
|
{
|
||||||
|
return this->HasFailed() ? AuUInt(this->eStreamError) : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &pSubscriber) override
|
||||||
|
{
|
||||||
|
this->pSubscriber = pSubscriber;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Wait(AuUInt32 uTimeout) override
|
||||||
|
{
|
||||||
|
return NewLoopSource()->WaitOn(uTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
AuSPtr<IO::Loop::ILoopSource> NewLoopSource() override
|
||||||
|
{
|
||||||
|
return Async::APCLessWaitable::GetLoopSource();
|
||||||
|
}
|
||||||
|
|
||||||
|
void Reset() override
|
||||||
|
{
|
||||||
|
if (this->bInProgress)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
AuResetMember(this->pReadView);
|
||||||
|
AuResetMember(this->pWriteView);
|
||||||
|
Async::APCLessWaitable::Reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
void SetBaseOffset(AuUInt64 uBaseOffset) override
|
||||||
|
{
|
||||||
|
this->uBaseOffset = uBaseOffset;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup) override
|
||||||
|
{
|
||||||
|
return Async::APCLessWaitable::TryAttachToCompletionGroup(pCompletionGroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle() override
|
||||||
|
{
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
AuSPtr<CompletionGroup::ICompletionGroup> GetCompletionGroup() override
|
||||||
|
{
|
||||||
|
return Async::APCLessWaitable::GetCompletionGroup();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
AuMutex mutex;
|
||||||
|
AuSPtr<IAsyncFinishedSubscriber> pSubscriber;
|
||||||
|
AuUInt64 uBaseOffset {};
|
||||||
|
AuUInt64 uLastOffset {};
|
||||||
|
AuUInt uLastLength {};
|
||||||
|
bool bInProgress {};
|
||||||
|
EStreamError eStreamError = EStreamError::eErrorNone;
|
||||||
|
};
|
||||||
|
|
||||||
|
static AuWorkerPId GetAuxWorkerPool()
|
||||||
|
{
|
||||||
|
return Async::GetAuxWorkerPoolAndRegister();
|
||||||
|
}
|
||||||
|
|
||||||
|
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamReader(const AuSPtr<IStreamReader> &pStreamReader, AuOptional<AuWorkerPId> workers)
|
||||||
|
{
|
||||||
|
SysCheckArgNotNull(pStreamReader, {});
|
||||||
|
auto pObject = AuMakeShared<AsyncReaderWriter>();
|
||||||
|
SysCheckNotNullMemory(pObject, {});
|
||||||
|
pObject->pStreamReader = pStreamReader;
|
||||||
|
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
|
||||||
|
return pObject;
|
||||||
|
}
|
||||||
|
|
||||||
|
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamSeekingReader(const AuSPtr<ISeekingReader> &pStreamReader, AuOptional<AuWorkerPId> workers)
|
||||||
|
{
|
||||||
|
SysCheckArgNotNull(pStreamReader, {});
|
||||||
|
auto pObject = AuMakeShared<AsyncReaderWriter>();
|
||||||
|
SysCheckNotNullMemory(pObject, {});
|
||||||
|
pObject->pStreamReaderEx = pStreamReader;
|
||||||
|
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
|
||||||
|
return pObject;
|
||||||
|
}
|
||||||
|
|
||||||
|
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamWriter(const AuSPtr<IStreamWriter> &pStreamWriter, AuOptional<AuWorkerPId> workers)
|
||||||
|
{
|
||||||
|
SysCheckArgNotNull(pStreamWriter, {});
|
||||||
|
auto pObject = AuMakeShared<AsyncReaderWriter>();
|
||||||
|
SysCheckNotNullMemory(pObject, {});
|
||||||
|
pObject->pStreamWriter = pStreamWriter;
|
||||||
|
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
|
||||||
|
return pObject;
|
||||||
|
}
|
||||||
|
|
||||||
|
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamSeekingWriter(const AuSPtr<ISeekingWriter> &pStreamWriter, AuOptional<Aurora::Async::WorkerPId_t> workers)
|
||||||
|
{
|
||||||
|
SysCheckArgNotNull(pStreamWriter, {});
|
||||||
|
auto pObject = AuMakeShared<AsyncReaderWriter>();
|
||||||
|
SysCheckNotNullMemory(pObject, {});
|
||||||
|
pObject->pStreamWriterEx = pStreamWriter;
|
||||||
|
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
|
||||||
|
return pObject;
|
||||||
|
}
|
||||||
|
}
|
12
Source/IO/Adapters/AuIOAdapterAsyncDelegators.hpp
Normal file
12
Source/IO/Adapters/AuIOAdapterAsyncDelegators.hpp
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: AuIOAdapterAsyncDelegators.hpp
|
||||||
|
Date: 2024-2-24
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace Aurora::IO::Adapters
|
||||||
|
{
|
||||||
|
}
|
130
Source/IO/Async/AuIOAPCLessWaitable.cpp
Normal file
130
Source/IO/Async/AuIOAPCLessWaitable.cpp
Normal file
@ -0,0 +1,130 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: AuIOAPCLessWaitable.cpp
|
||||||
|
Date: 2024-2-25
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#include <Source/RuntimeInternal.hpp>
|
||||||
|
#include "AuIOAPCLessWaitable.hpp"
|
||||||
|
|
||||||
|
namespace Aurora::IO::Async
|
||||||
|
{
|
||||||
|
void APCLessEvent::OnFinishSleep()
|
||||||
|
{
|
||||||
|
if (auto pThat = AuTryLockMemoryType(this->wpParent))
|
||||||
|
{
|
||||||
|
pThat->CheckLocal();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
APCLessWaitable::APCLessWaitable()
|
||||||
|
{
|
||||||
|
this->threadId = AuThreads::GetThreadId();
|
||||||
|
}
|
||||||
|
|
||||||
|
AuSPtr<Loop::ILoopSource> APCLessWaitable::GetLoopSource()
|
||||||
|
{
|
||||||
|
AU_LOCK_GUARD(this->mutex);
|
||||||
|
|
||||||
|
if (this->pCompletionGroup)
|
||||||
|
{
|
||||||
|
return this->pCompletionGroup->ToAnyLoopSource();
|
||||||
|
}
|
||||||
|
else if (this->pEvent)
|
||||||
|
{
|
||||||
|
return this->pEvent;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
this->pEvent = AuMakeShared<APCLessEvent>();
|
||||||
|
SysCheckNotNullMemory(this->pEvent, {});
|
||||||
|
SysCheckReturn(this->pEvent->TryInit(false, false, true), {});
|
||||||
|
this->pEvent->wpParent = this->SharedFromThis();
|
||||||
|
return this->pEvent;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool APCLessWaitable::HasCompletedForGCWI()
|
||||||
|
{
|
||||||
|
this->CheckLocal();
|
||||||
|
return this->bHasFinished;
|
||||||
|
}
|
||||||
|
|
||||||
|
void APCLessWaitable::CleanupForGCWI()
|
||||||
|
{
|
||||||
|
AuResetMember(this->pCompletionGroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool APCLessWaitable::HasBeenSignaled()
|
||||||
|
{
|
||||||
|
return this->bHasFinished;
|
||||||
|
}
|
||||||
|
|
||||||
|
void APCLessWaitable::SignalComplete()
|
||||||
|
{
|
||||||
|
AU_LOCK_GUARD(this->mutex);
|
||||||
|
|
||||||
|
this->bHasFinished = true;
|
||||||
|
|
||||||
|
if (this->pEvent)
|
||||||
|
{
|
||||||
|
this->pEvent->Set();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this->pCompletionGroup)
|
||||||
|
{
|
||||||
|
if (auto pOtherEvent = this->pCompletionGroup->GetTriggerLoopSource())
|
||||||
|
{
|
||||||
|
pOtherEvent->Set();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void APCLessWaitable::Reset()
|
||||||
|
{
|
||||||
|
if (this->pEvent)
|
||||||
|
{
|
||||||
|
this->pEvent->Reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
this->bHasFinished = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void APCLessWaitable::CheckLocal()
|
||||||
|
{
|
||||||
|
if (!this->bHasFinished)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this->threadId == AuThreads::GetThreadId())
|
||||||
|
{
|
||||||
|
this->OnOriginThreadComplete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool APCLessWaitable::TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup)
|
||||||
|
{
|
||||||
|
AU_LOCK_GUARD(this->mutex);
|
||||||
|
|
||||||
|
if (!pCompletionGroup)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this->pCompletionGroup)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
this->pCompletionGroup = pCompletionGroup;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
AuSPtr<CompletionGroup::ICompletionGroup> APCLessWaitable::GetCompletionGroup()
|
||||||
|
{
|
||||||
|
AU_LOCK_GUARD(this->mutex);
|
||||||
|
return this->pCompletionGroup;
|
||||||
|
}
|
||||||
|
}
|
53
Source/IO/Async/AuIOAPCLessWaitable.hpp
Normal file
53
Source/IO/Async/AuIOAPCLessWaitable.hpp
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: AuIOAPCLessWaitable.hpp
|
||||||
|
Date: 2024-2-25
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <Source/IO/Loop/LSEvent.hpp>
|
||||||
|
|
||||||
|
namespace Aurora::IO::Async
|
||||||
|
{
|
||||||
|
struct APCLessWaitable;
|
||||||
|
|
||||||
|
struct APCLessEvent : Loop::LSEvent
|
||||||
|
{
|
||||||
|
AuWPtr<APCLessWaitable> wpParent;
|
||||||
|
|
||||||
|
void OnFinishSleep() override;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct APCLessWaitable :
|
||||||
|
CompletionGroup::ICompletionGroupWorkItem,
|
||||||
|
AuEnableSharedFromThis<APCLessWaitable>
|
||||||
|
{
|
||||||
|
APCLessWaitable();
|
||||||
|
|
||||||
|
AuSPtr<Loop::ILoopSource> GetLoopSource();
|
||||||
|
void SignalComplete();
|
||||||
|
void CheckLocal();
|
||||||
|
|
||||||
|
bool TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup);
|
||||||
|
AuSPtr<CompletionGroup::ICompletionGroup> GetCompletionGroup();
|
||||||
|
|
||||||
|
void Reset();
|
||||||
|
|
||||||
|
virtual void OnOriginThreadComplete() = 0;
|
||||||
|
|
||||||
|
virtual bool HasCompletedForGCWI() override;
|
||||||
|
virtual void CleanupForGCWI() override;
|
||||||
|
|
||||||
|
bool HasBeenSignaled();
|
||||||
|
|
||||||
|
private:
|
||||||
|
AuUInt threadId {};
|
||||||
|
AuSPtr<CompletionGroup::ICompletionGroup> pCompletionGroup;
|
||||||
|
AuMutex mutex;
|
||||||
|
AuSPtr<APCLessEvent> pEvent;
|
||||||
|
bool bHasCalledBack {};
|
||||||
|
volatile bool bHasFinished {};
|
||||||
|
};
|
||||||
|
}
|
110
Source/IO/Async/AuIOThreadPool.cpp
Normal file
110
Source/IO/Async/AuIOThreadPool.cpp
Normal file
@ -0,0 +1,110 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: AuIOThreadPool.cpp
|
||||||
|
Date: 2024-2-25
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#include <Source/RuntimeInternal.hpp>
|
||||||
|
#include "AuIOThreadPool.hpp"
|
||||||
|
|
||||||
|
namespace Aurora::IO::Async
|
||||||
|
{
|
||||||
|
static AuMutex gMutex;
|
||||||
|
static AuAsync::WorkerPId_t gDefaultGroup;
|
||||||
|
static AuSPtr<AuAsync::IThreadPool> gThreadPool;
|
||||||
|
static AuList<AuWPtr<AuAsync::IThreadPool>> gWorkersRegistered;
|
||||||
|
static AuUInt gJobRunners = 4;
|
||||||
|
|
||||||
|
AUKN_SYM void UseSpecifiedWorkerGroup(AuAsync::WorkerPId_t worker)
|
||||||
|
{
|
||||||
|
AU_LOCK_GUARD(gMutex);
|
||||||
|
gDefaultGroup = worker;
|
||||||
|
}
|
||||||
|
|
||||||
|
static AuAsync::WorkerPId_t SpawnDefaultGroup()
|
||||||
|
{
|
||||||
|
AuAsync::WorkerPId_t ret;
|
||||||
|
|
||||||
|
// TODO: gJobRunners = runtime config?
|
||||||
|
|
||||||
|
gThreadPool = AuAsync::NewThreadPool();
|
||||||
|
if (!gThreadPool)
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = { gThreadPool, 0, AuAsync::kThreadIdAny };
|
||||||
|
|
||||||
|
for (AU_ITERATE_N(i, gJobRunners))
|
||||||
|
{
|
||||||
|
AuAsync::WorkerPId_t copy = ret;
|
||||||
|
copy.second = i;
|
||||||
|
|
||||||
|
if (!gThreadPool->Spawn(copy))
|
||||||
|
{
|
||||||
|
if (i > 1)
|
||||||
|
{
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
static AuAsync::WorkerPId_t GetAuxWorkerPool()
|
||||||
|
{
|
||||||
|
if (gDefaultGroup)
|
||||||
|
{
|
||||||
|
return gDefaultGroup;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return gDefaultGroup = SpawnDefaultGroup();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
AuAsync::WorkerPId_t GetAuxWorkerPoolAndRegister()
|
||||||
|
{
|
||||||
|
AU_LOCK_GUARD(gMutex);
|
||||||
|
|
||||||
|
if (auto worker = GetAuxWorkerPool())
|
||||||
|
{
|
||||||
|
if (auto selfWorker = AuAsync::GetCurrentWorkerPId())
|
||||||
|
{
|
||||||
|
if (gWorkersRegistered.size() > 10)
|
||||||
|
{
|
||||||
|
AuRemoveAllIf(gWorkersRegistered, [](AuWPtr<AuAsync::IThreadPool> wpThat)
|
||||||
|
{
|
||||||
|
return wpThat.expired();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto &wpThat : gWorkersRegistered)
|
||||||
|
{
|
||||||
|
if (auto pThat = AuTryLockMemoryType(wpThat))
|
||||||
|
{
|
||||||
|
if (pThat == selfWorker.GetPool())
|
||||||
|
{
|
||||||
|
return worker;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
gWorkersRegistered.push_back(selfWorker.GetPool());
|
||||||
|
selfWorker.GetPool()->AddDependency(worker.GetPool());
|
||||||
|
}
|
||||||
|
|
||||||
|
return worker;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
SysPanic("No async threadpool");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
13
Source/IO/Async/AuIOThreadPool.hpp
Normal file
13
Source/IO/Async/AuIOThreadPool.hpp
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: AuIOThreadPool.hpp
|
||||||
|
Date: 2024-2-25
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace Aurora::IO::Async
|
||||||
|
{
|
||||||
|
AuAsync::WorkerPId_t GetAuxWorkerPoolAndRegister();
|
||||||
|
}
|
@ -973,7 +973,7 @@ namespace Aurora::IO
|
|||||||
return processor;
|
return processor;
|
||||||
}
|
}
|
||||||
|
|
||||||
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessorOnThread(bool tickOnly, Async::WorkerPId_t id)
|
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessorOnThread(bool tickOnly, AuAsync::WorkerPId_t id)
|
||||||
{
|
{
|
||||||
if (!id.GetPool())
|
if (!id.GetPool())
|
||||||
{
|
{
|
||||||
|
@ -175,7 +175,7 @@ namespace Aurora::IO::CompletionGroup
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void CompletionGroup::AddWorkItem(AuSPtr<ICompletionGroupWorkItem> pCompletable)
|
void CompletionGroup::AddWorkItem(const AuSPtr<ICompletionGroupWorkItem> &pCompletable)
|
||||||
{
|
{
|
||||||
SysAssert(!this->bTerminated, "Completion group already terminated");
|
SysAssert(!this->bTerminated, "Completion group already terminated");
|
||||||
AU_LOCK_GUARD(this->mutex);
|
AU_LOCK_GUARD(this->mutex);
|
||||||
@ -248,7 +248,7 @@ namespace Aurora::IO::CompletionGroup
|
|||||||
this->DoIOTick(true);
|
this->DoIOTick(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
AuSPtr<Async::IWorkItem> CompletionGroup::OnCompletion()
|
AuSPtr<AuAsync::IWorkItem> CompletionGroup::OnCompletion()
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->cs);
|
AU_LOCK_GUARD(this->cs);
|
||||||
|
|
||||||
@ -299,7 +299,7 @@ namespace Aurora::IO::CompletionGroup
|
|||||||
return pRet;
|
return pRet;
|
||||||
}
|
}
|
||||||
|
|
||||||
AuSPtr<Async::IWorkItem> CompletionGroup::OnSingleCompletion()
|
AuSPtr<AuAsync::IWorkItem> CompletionGroup::OnSingleCompletion()
|
||||||
{
|
{
|
||||||
AU_LOCK_GUARD(this->cs);
|
AU_LOCK_GUARD(this->cs);
|
||||||
|
|
||||||
|
@ -21,8 +21,8 @@ namespace Aurora::IO::CompletionGroup
|
|||||||
AuSPtr<Loop::ILoopSource> ToAndLoopSource() override;
|
AuSPtr<Loop::ILoopSource> ToAndLoopSource() override;
|
||||||
AuSPtr<Loop::ILoopSource> ToAnyLoopSource() override;
|
AuSPtr<Loop::ILoopSource> ToAnyLoopSource() override;
|
||||||
|
|
||||||
AuSPtr<Async::IWorkItem> OnCompletion() override;
|
AuSPtr<AuAsync::IWorkItem> OnCompletion() override;
|
||||||
AuSPtr<Async::IWorkItem> OnSingleCompletion() override;
|
AuSPtr<AuAsync::IWorkItem> OnSingleCompletion() override;
|
||||||
|
|
||||||
bool WaitForAnyMS(AuUInt32 uTimeoutOrZeroMS) override;
|
bool WaitForAnyMS(AuUInt32 uTimeoutOrZeroMS) override;
|
||||||
bool WaitForAllMS(AuUInt32 uTimeoutOrZeroMS) override;
|
bool WaitForAllMS(AuUInt32 uTimeoutOrZeroMS) override;
|
||||||
@ -39,7 +39,7 @@ namespace Aurora::IO::CompletionGroup
|
|||||||
void ResetMemoryPins();
|
void ResetMemoryPins();
|
||||||
bool HasItemsActive();
|
bool HasItemsActive();
|
||||||
|
|
||||||
void AddWorkItem(AuSPtr<ICompletionGroupWorkItem> pCompletable) override;
|
void AddWorkItem(const AuSPtr<ICompletionGroupWorkItem> &pCompletable) override;
|
||||||
|
|
||||||
void AddCallbackTick(const AuSPtr<IIOProcessorManualInvoker> &pCallbackInvoker, bool bAny);
|
void AddCallbackTick(const AuSPtr<IIOProcessorManualInvoker> &pCallbackInvoker, bool bAny);
|
||||||
|
|
||||||
@ -53,8 +53,8 @@ namespace Aurora::IO::CompletionGroup
|
|||||||
CompletionQuantumEventProvider andPlsDontAllocateFdIfUntouchedEvent;
|
CompletionQuantumEventProvider andPlsDontAllocateFdIfUntouchedEvent;
|
||||||
AuList<AuSPtr<ICompletionGroupWorkItem>> workItems;
|
AuList<AuSPtr<ICompletionGroupWorkItem>> workItems;
|
||||||
AuSPtr<ICompletionGroupHooks> pCallbacks;
|
AuSPtr<ICompletionGroupHooks> pCallbacks;
|
||||||
AuSPtr<Async::IWorkItem> pAnyBarrier;
|
AuSPtr<AuAsync::IWorkItem> pAnyBarrier;
|
||||||
AuSPtr<Async::IWorkItem> pAndBarrier;
|
AuSPtr<AuAsync::IWorkItem> pAndBarrier;
|
||||||
AuList<AuPair<AuSPtr<IIOProcessorManualInvoker>, bool>> callbackTicks;
|
AuList<AuPair<AuSPtr<IIOProcessorManualInvoker>, bool>> callbackTicks;
|
||||||
AuUInt32 uAdded {};
|
AuUInt32 uAdded {};
|
||||||
AuUInt32 uTriggered {};
|
AuUInt32 uTriggered {};
|
||||||
|
@ -12,16 +12,16 @@
|
|||||||
|
|
||||||
namespace Aurora::IO::CompletionGroup
|
namespace Aurora::IO::CompletionGroup
|
||||||
{
|
{
|
||||||
CompletionGroupAndedIOWorkItem::CompletionGroupAndedIOWorkItem(Async::IThreadPoolInternal *owner,
|
CompletionGroupAndedIOWorkItem::CompletionGroupAndedIOWorkItem(AuAsync::IThreadPoolInternal *owner,
|
||||||
const AuWorkerID &worker,
|
const AuWorkerID &worker,
|
||||||
AuSPtr<CompletionGroup> pParent) :
|
AuSPtr<CompletionGroup> pParent) :
|
||||||
Async::WorkItem(owner, worker, nullptr),
|
AuAsync::WorkItem(owner, worker, nullptr),
|
||||||
pParent(pParent)
|
pParent(pParent)
|
||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void CompletionGroupAndedIOWorkItem::DispatchTask(Async::IWorkItemHandler::ProcessInfo &info)
|
void CompletionGroupAndedIOWorkItem::DispatchTask(AuAsync::IWorkItemHandler::ProcessInfo &info)
|
||||||
{
|
{
|
||||||
this->pParent->DoIOTick(false);
|
this->pParent->DoIOTick(false);
|
||||||
|
|
||||||
|
@ -11,13 +11,13 @@
|
|||||||
|
|
||||||
namespace Aurora::IO::CompletionGroup
|
namespace Aurora::IO::CompletionGroup
|
||||||
{
|
{
|
||||||
struct CompletionGroupAndedIOWorkItem : Async::WorkItem
|
struct CompletionGroupAndedIOWorkItem : AuAsync::WorkItem
|
||||||
{
|
{
|
||||||
CompletionGroupAndedIOWorkItem(Async::IThreadPoolInternal *owner,
|
CompletionGroupAndedIOWorkItem(AuAsync::IThreadPoolInternal *owner,
|
||||||
const AuWorkerID &worker,
|
const AuWorkerID &worker,
|
||||||
AuSPtr<CompletionGroup> pParent);
|
AuSPtr<CompletionGroup> pParent);
|
||||||
|
|
||||||
void DispatchTask(Async::IWorkItemHandler::ProcessInfo &info) override;
|
void DispatchTask(AuAsync::IWorkItemHandler::ProcessInfo &info) override;
|
||||||
AuSPtr<CompletionGroup> pParent;
|
AuSPtr<CompletionGroup> pParent;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -42,11 +42,11 @@ namespace Aurora::IO::Loop
|
|||||||
return AuConsole::StdInBufferLoopSource();
|
return AuConsole::StdInBufferLoopSource();
|
||||||
}
|
}
|
||||||
|
|
||||||
AUKN_SYM AuSPtr<ILoopSource> NewLSAsync(Async::WorkerPId_t workerPid)
|
AUKN_SYM AuSPtr<ILoopSource> NewLSAsync(AuAsync::WorkerPId_t workerPid)
|
||||||
{
|
{
|
||||||
if (!workerPid)
|
if (!workerPid)
|
||||||
{
|
{
|
||||||
return Async::GetAsyncApp()->WorkerToLoopSource(workerPid);
|
return AuAsync::GetAsyncApp()->WorkerToLoopSource(workerPid);
|
||||||
}
|
}
|
||||||
|
|
||||||
return workerPid.GetPool()->WorkerToLoopSource(workerPid);
|
return workerPid.GetPool()->WorkerToLoopSource(workerPid);
|
||||||
|
Loading…
Reference in New Issue
Block a user