[+] IO Thread Pool
[+] FileSeekingWriter [+] ISeekingWriter [+] AuIO::Adapters::NewAsyncTransactionFromStreamReader [+] AuIO::Adapters::NewAsyncTransactionFromStreamSeekingReader [+] AuIO::Adapters::NewAsyncTransactionFromStreamWriter [+] AuIO::Adapters::NewAsyncTransactionFromStreamSeekingWriter [+] AuIO::Async::UseSpecifiedWorkerGroup [+] AuMemory::NewSharableResizableBuffer [+] AuMemory::NewSharableBuffer [*] Update comments
This commit is contained in:
parent
7dbf564a27
commit
1920f5a8d5
@ -123,7 +123,7 @@ namespace Aurora::Async
|
||||
cstatic WorkerPId_t FromLocalAnyInGroup(ThreadGroup_t group)
|
||||
{
|
||||
auto worker = GetCurrentWorkerPId();
|
||||
SysAssertDbg(worker, "No attached to a thread pool");
|
||||
SysAssertDbg(worker, "Not attached to a thread pool");
|
||||
return FromJobRunner(worker.GetPool(), group);
|
||||
}
|
||||
|
||||
@ -131,14 +131,14 @@ namespace Aurora::Async
|
||||
ThreadId_t id)
|
||||
{
|
||||
auto worker = GetCurrentWorkerPId();
|
||||
SysAssertDbg(worker, "No attached to a thread pool");
|
||||
SysAssertDbg(worker, "Not attached to a thread pool");
|
||||
return FromJobRunner(worker.GetPool(), group, id);
|
||||
}
|
||||
|
||||
cstatic WorkerPId_t FromLocalJobRunner(WorkerId_t id)
|
||||
{
|
||||
auto worker = GetCurrentWorkerPId();
|
||||
SysAssertDbg(worker, "No attached to a thread pool");
|
||||
SysAssertDbg(worker, "Not attached to a thread pool");
|
||||
return FromJobRunner(worker.GetPool(), id);
|
||||
}
|
||||
|
||||
@ -146,14 +146,14 @@ namespace Aurora::Async
|
||||
ThreadId_t id)
|
||||
{
|
||||
auto worker = GetCurrentWorkerPId();
|
||||
SysAssertDbg(worker, "No attached to a thread pool");
|
||||
SysAssertDbg(worker, "Not attached to a thread pool");
|
||||
return FromJobRunner(worker.GetPool(), group, id);
|
||||
}
|
||||
|
||||
cstatic WorkerPId_t FromLocalThreadID(ThreadId_t id)
|
||||
{
|
||||
auto worker = GetCurrentWorkerPId();
|
||||
SysAssertDbg(worker, "No in thread");
|
||||
SysAssertDbg(worker, "Not in thread");
|
||||
return FromJobRunner(worker.GetPool(), worker.GetGroup(), id);
|
||||
}
|
||||
|
||||
|
16
Include/Aurora/IO/Adapters/IOAdapterAsyncDelegators.hpp
Normal file
16
Include/Aurora/IO/Adapters/IOAdapterAsyncDelegators.hpp
Normal file
@ -0,0 +1,16 @@
|
||||
/***
|
||||
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||
|
||||
File: IOAdapterAsyncDelegators.hpp
|
||||
Date: 2024-2-24
|
||||
Author: Reece
|
||||
***/
|
||||
#pragma once
|
||||
|
||||
namespace Aurora::IO::Adapters
|
||||
{
|
||||
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamReader(const AuSPtr<IStreamReader> &pStreamReader, AuOptional<Aurora::Async::WorkerPId_t> workers);
|
||||
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamSeekingReader(const AuSPtr<ISeekingReader> &pStreamReader, AuOptional<Aurora::Async::WorkerPId_t> workers);
|
||||
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamWriter(const AuSPtr<IStreamWriter> &pStreamWriter, AuOptional<Aurora::Async::WorkerPId_t> workers);
|
||||
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamSeekingWriter(const AuSPtr<ISeekingWriter> &pStreamWriter, AuOptional<Aurora::Async::WorkerPId_t> workers);
|
||||
}
|
13
Include/Aurora/IO/Async/IOAsync.hpp
Normal file
13
Include/Aurora/IO/Async/IOAsync.hpp
Normal file
@ -0,0 +1,13 @@
|
||||
/***
|
||||
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||
|
||||
File: IOAsync.hpp
|
||||
Date: 2024-2-25
|
||||
Author: Reece
|
||||
***/
|
||||
#pragma once
|
||||
|
||||
namespace Aurora::IO::Async
|
||||
{
|
||||
AUKN_SYM void UseSpecifiedWorkerGroup(Aurora::Async::WorkerPId_t worker);
|
||||
}
|
@ -31,7 +31,7 @@ namespace Aurora::IO::CompletionGroup
|
||||
virtual AuPair<AuUInt32, AuUInt32> GetStats() = 0;
|
||||
virtual void SetCallbacks(const AuSPtr<ICompletionGroupHooks> &pCallbacks) = 0;
|
||||
|
||||
virtual void AddWorkItem(AuSPtr<ICompletionGroupWorkItem> pCompletable) = 0;
|
||||
virtual void AddWorkItem(const AuSPtr<ICompletionGroupWorkItem> &pCompletable) = 0;
|
||||
|
||||
virtual AuSPtr<Loop::ILSEvent> GetTriggerLoopSource() = 0;
|
||||
virtual void TryTrigger() = 0;
|
||||
|
@ -80,6 +80,10 @@ namespace Aurora::IO::FS
|
||||
|
||||
/**
|
||||
* @brief Same as WriteFile except fails if the file already exists
|
||||
* Note the "New" series of functions ensures the data has been flushed via a blocking call.
|
||||
* If you want further control over when flushing occurs, you should use the open file APIs.
|
||||
* This is to ensure lazy POSIX-like applications can pass data between applications without
|
||||
* getting blocked by a partially written file or locked file descriptor.
|
||||
* @param path
|
||||
* @param blob
|
||||
* @return
|
||||
@ -97,6 +101,10 @@ namespace Aurora::IO::FS
|
||||
|
||||
/**
|
||||
* @brief Same as WriteString except fails if the file already exists
|
||||
* Note the "New" series of functions ensures the data has been flushed via a blocking call.
|
||||
* If you want further control over when flushing occurs, you should use the open file APIs.
|
||||
* This is to ensure lazy POSIX-like applications can pass data between applications without
|
||||
* getting blocked by a partially written file or locked file descriptor.
|
||||
* @param path
|
||||
* @param str
|
||||
* @return
|
||||
@ -300,6 +308,7 @@ namespace Aurora::IO::FS
|
||||
#include "IFileStream.hpp"
|
||||
#include "FileStream.hpp"
|
||||
#include "FileSeekableReader.hpp"
|
||||
#include "FileSeekableWriter.hpp"
|
||||
#include "FileReader.hpp"
|
||||
#include "FileWriter.hpp"
|
||||
#include "Resources.hpp"
|
||||
|
73
Include/Aurora/IO/FS/FileSeekableWriter.hpp
Normal file
73
Include/Aurora/IO/FS/FileSeekableWriter.hpp
Normal file
@ -0,0 +1,73 @@
|
||||
/***
|
||||
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||
|
||||
File: FileSeekableWriter.hpp
|
||||
Date: 2024-2-25
|
||||
Author: Reece
|
||||
***/
|
||||
#pragma once
|
||||
|
||||
namespace Aurora::IO::FS
|
||||
{
|
||||
struct FileSeekableWriter : ISeekingWriter
|
||||
{
|
||||
AU_NO_COPY_NO_MOVE(FileSeekableWriter)
|
||||
|
||||
inline FileSeekableWriter() {}
|
||||
inline FileSeekableWriter(const AuSPtr<IFileStream> &stream) : stream_(stream) { }
|
||||
inline ~FileSeekableWriter() {}
|
||||
|
||||
template<typename... T>
|
||||
bool OpenFile(T... args)
|
||||
{
|
||||
this->stream_ = OpenWriteShared(args...);
|
||||
return bool(this->stream_);
|
||||
}
|
||||
|
||||
virtual EStreamError IsOpen() override
|
||||
{
|
||||
return this->stream_ ? EStreamError::eErrorNone : EStreamError::eErrorStreamInterrupted;
|
||||
}
|
||||
|
||||
virtual EStreamError ArbitraryWrite(AuUInt offset, const Memory::MemoryViewStreamRead ¶mters) override
|
||||
{
|
||||
if (!this->stream_)
|
||||
{
|
||||
return EStreamError::eErrorStreamNotOpen;
|
||||
}
|
||||
|
||||
if (!this->stream_->SetOffset(offset))
|
||||
{
|
||||
return EStreamError::eErrorEndOfStream;
|
||||
}
|
||||
|
||||
if (!this->stream_->Write(paramters))
|
||||
{
|
||||
return EStreamError::eErrorStreamInterrupted;
|
||||
}
|
||||
|
||||
if (paramters.outVariable == 0)
|
||||
{
|
||||
return EStreamError::eErrorEndOfStream;
|
||||
}
|
||||
|
||||
return EStreamError::eErrorNone;
|
||||
}
|
||||
|
||||
virtual void Flush() override
|
||||
{
|
||||
if (this->stream_)
|
||||
{
|
||||
this->stream_->Flush();
|
||||
}
|
||||
}
|
||||
|
||||
virtual void Close() override
|
||||
{
|
||||
this->stream_.reset();
|
||||
}
|
||||
|
||||
private:
|
||||
AuSPtr<IFileStream> stream_{};
|
||||
};
|
||||
}
|
@ -35,12 +35,31 @@ namespace Aurora::IO
|
||||
virtual bool StartWrite(AuUInt64 uOffset, const AuSPtr<Memory::MemoryViewRead> &memoryView) = 0;
|
||||
|
||||
/**
|
||||
* @brief "Non-blocking" is-signaled and call callback poll routine (similar to nt alertable sleeps of a period zero)
|
||||
* @brief "Non-blocking" is-signaled and call any callback routines (similar to nt alertable sleeps of a period zero)
|
||||
*/
|
||||
virtual bool Complete() = 0;
|
||||
|
||||
/**
|
||||
* @brief Block for completion and call any callback routines (similar to nt alertable sleeps)
|
||||
*/
|
||||
virtual bool Wait(AuUInt32 uTimeout) = 0;
|
||||
|
||||
/**
|
||||
* @brief Non-blocking has-failed (no callbacks, no alert sleep)
|
||||
* @warning Also note that we generally try to suppress end of stream "errors"
|
||||
* @warning If the underlying stream is empty or at the EoS/F point, expect a successful read of zero at least once.
|
||||
* Further reads may fail.
|
||||
*
|
||||
* This design philosophy extends to networking whereby various socket shutdown conditions that most applications would
|
||||
* consider to be an unsafe shutdown are suppressed as non-fatal. Not that you can't manually query the last error,
|
||||
* it's just that we shouldn't be generating massive errors logs and presenting stupid warnings to the client, over so
|
||||
* called "errors" that merely constitute somebodys mother vacuuming over an ethernet cable or a cell tower falling out of
|
||||
* range. In my estimation, IO errors and other high level errors as presented to an application should only be that of
|
||||
* logical errors, or that of us bailing out of an unsafe condition, or perhaps the kernel crying about IO resources. Some
|
||||
* smart-ass networking protocol being aware of an incomplete socket shutdown isn't worth noting, unless the user has
|
||||
* explicit wishes to log the exact reason of socket shutdown (ie: ISocket::GetError() under ISocketDriver::OnEnd)
|
||||
*
|
||||
* Likewise, this reasoning extends to IPC pipes; pipe breakage or other EoS condition should not result in an error.
|
||||
* @return
|
||||
*/
|
||||
virtual bool HasFailed() = 0;
|
||||
@ -68,14 +87,19 @@ namespace Aurora::IO
|
||||
*/
|
||||
virtual void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &pSubscriber) = 0;
|
||||
|
||||
/**
|
||||
* @brief Block for completion
|
||||
*/
|
||||
virtual bool Wait(AuUInt32 uTimeout) = 0;
|
||||
|
||||
/**
|
||||
* @brief Provides a loop source that becomes signaled once the transaction is complete.
|
||||
* Polling the transaction may result in the execution of the callback.
|
||||
*
|
||||
* @warning This function needs refactoring from "New..." to "To..."
|
||||
* @warning This loop source may be that of the CompletionGroup::ICompletionGroup::ToAnyLoopSource(), if provided via TryAttachToCompletionGroup
|
||||
* @warning Remember to never provide the same loop source twice to the sam LoopQueue or to sleep twice on the same loop source using AuAsync
|
||||
*
|
||||
* To prevent logicial errors, you should only be sleeping on this loop source within your scope rather than delegating it a ILoopQueue or AuAsync to prevent double attaches
|
||||
* EG: pThat->NewLoopSource()->WaitOn(5 /\*ms*\/);
|
||||
* This is just a recommendation. If you are yielding on only a few file streams and can guarantee you aren't registering the source twice on a single LoopQueue, it's fine.
|
||||
*
|
||||
* If you want to ensure proper batching with minimal IO resources, and safe AuAsync interop, you should probably just use CompletionGroups
|
||||
*/
|
||||
virtual AuSPtr<IO::Loop::ILoopSource> NewLoopSource() = 0;
|
||||
|
||||
|
@ -12,7 +12,8 @@
|
||||
|
||||
#include "IStreamReader.hpp"
|
||||
#include "IStreamWriter.hpp"
|
||||
#include "ISeekingReader.hpp" // arbitrary read stream, dunno what you would want to call thousands_sep
|
||||
#include "ISeekingReader.hpp"
|
||||
#include "ISeekingWriter.hpp"
|
||||
|
||||
#include "Buffered/Buffered.hpp"
|
||||
|
||||
|
@ -54,6 +54,7 @@
|
||||
#include "Adapters/IOAdapterRandom.hpp"
|
||||
#include "Adapters/IOAdapterNOPs.hpp"
|
||||
#include "Adapters/IOAdapterZeros.hpp"
|
||||
#include "Adapters/IOAdapterAsyncDelegators.hpp"
|
||||
|
||||
#include "IIOWaitableIOTimer.hpp"
|
||||
#include "IIOWaitableIOLoopSource.hpp"
|
||||
@ -63,3 +64,5 @@
|
||||
#include "CompletionGroup/CompletionGroup.hpp"
|
||||
|
||||
#include "IOWaitableIOCompletionGroup.hpp"
|
||||
|
||||
#include "Async/IOAsync.hpp"
|
19
Include/Aurora/IO/ISeekingWriter.hpp
Normal file
19
Include/Aurora/IO/ISeekingWriter.hpp
Normal file
@ -0,0 +1,19 @@
|
||||
/***
|
||||
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||
|
||||
File: ISeekingWriter.hpp
|
||||
Date: 2024-2-25
|
||||
Author: Reece
|
||||
***/
|
||||
#pragma once
|
||||
|
||||
namespace Aurora::IO
|
||||
{
|
||||
AUKN_INTERFACE(ISeekingWriter,
|
||||
AUI_METHOD(EStreamError, IsOpen, ()),
|
||||
AUI_METHOD(EStreamError, ArbitraryWrite, (AuUInt, uOffset,
|
||||
const Memory::MemoryViewStreamRead&, writeView)),
|
||||
AUI_METHOD(void, Flush, ()),
|
||||
AUI_METHOD(void, Close, ())
|
||||
);
|
||||
}
|
@ -84,7 +84,7 @@ namespace Aurora::IO::Loop
|
||||
AUKN_SYM AuSPtr<ILSSemaphore> NewLSSemaphore(AuUInt32 uInitialCount = 0);
|
||||
AUKN_SYM AuSPtr<ILSSemaphore> NewLSSemaphoreSlow(AuUInt32 uInitialCount = 0); // interop-ready (usable with DbgLoopSourceToReadFd)
|
||||
AUKN_SYM AuSPtr<ILoopSource> NewLSOSHandle(AuUInt);
|
||||
AUKN_SYM AuSPtr<ILoopSource> NewLSAsync(Async::WorkerPId_t workerPid);
|
||||
AUKN_SYM AuSPtr<ILoopSource> NewLSAsync(Aurora::Async::WorkerPId_t workerPid);
|
||||
AUKN_SYM AuSPtr<ILoopSource> NewLSFile(const AuSPtr<IO::IAsyncTransaction> &fileTransaction);
|
||||
AUKN_SYM AuSPtr<ILoopSource> NewStdIn();
|
||||
AUKN_SYM AuSPtr<ILoopSource> NewLSWin32Source(bool dispatchMessages);
|
||||
|
@ -24,4 +24,21 @@ namespace Aurora::IO::Net
|
||||
|
||||
AUI_METHOD(void, OnFinalize, ())
|
||||
);
|
||||
|
||||
/*
|
||||
* On ISocketDriver::OnEnd() versus ISocketDriver::OnFatalErrorReported() per IAsyncTransaction::HasFailed()
|
||||
*
|
||||
* [...] various socket shutdown conditions that most applications would
|
||||
* consider to be an unsafe shutdown are suppressed as non-fatal. Not that you can't manually query the last error,
|
||||
* it's just that we shouldn't be generating massive errors logs and presenting stupid warnings to the client, over so
|
||||
* called "errors" that merely constitute somebodys mother vacuuming over an ethernet cable or a cell tower falling out of
|
||||
* range. In my estimation, IO errors and other high level errors as presented to an application should only be that of
|
||||
* logical errors, or that of us bailing out of an unsafe condition, or perhaps the kernel crying about IO resources. Some
|
||||
* smart-ass networking protocol being aware of an incomplete socket shutdown isn't worth noting, unless the user has
|
||||
* explicit wishes to log the exact reason of socket shutdown (ie: ISocket::GetError() under ISocketDriver::OnEnd())
|
||||
*
|
||||
* Therefore, OnFatalErrorReported() does not report some "critical" errors in transport, unless they're actually worth noting.
|
||||
* We simply treat them as though the socket is shutting down gracefully for ease of logging.
|
||||
* In either case, see `ISocket::GetError()` for the exact reason of shutdown.
|
||||
*/
|
||||
}
|
@ -599,16 +599,6 @@ namespace Aurora::Memory
|
||||
}
|
||||
};
|
||||
|
||||
static ByteBuffer NewResizableBuffer(AuUInt32 length = 0)
|
||||
{
|
||||
return ByteBuffer(length, false, true);
|
||||
}
|
||||
|
||||
static ByteBuffer NewRingBuffer(AuUInt32 length = 1024 * 5)
|
||||
{
|
||||
return ByteBuffer(length, true, false);
|
||||
}
|
||||
|
||||
struct SharedByteBuffer : ByteBuffer
|
||||
{
|
||||
AuSPtr<void> memory;
|
||||
@ -681,4 +671,35 @@ namespace Aurora::Memory
|
||||
return this->ToSharedReadView();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
static ByteBuffer NewResizableBuffer(AuUInt32 length = 0)
|
||||
{
|
||||
return ByteBuffer(length, false, true);
|
||||
}
|
||||
|
||||
static ByteBuffer NewRingBuffer(AuUInt32 length = 1024 * 5)
|
||||
{
|
||||
return ByteBuffer(length, true, false);
|
||||
}
|
||||
|
||||
static AuSPtr<SharableByteBuffer> NewSharableResizableBuffer(AuUInt32 length = 0)
|
||||
{
|
||||
auto pThat = AuMakeShared<SharableByteBuffer>(length, false, true);
|
||||
if (!(pThat && *pThat))
|
||||
{
|
||||
return {};
|
||||
}
|
||||
return pThat;
|
||||
}
|
||||
|
||||
static AuSPtr<SharableByteBuffer> NewSharableBuffer(AuUInt32 length = 0)
|
||||
{
|
||||
auto pThat = AuMakeShared<SharableByteBuffer>(length);
|
||||
if (!(pThat && *pThat))
|
||||
{
|
||||
return {};
|
||||
}
|
||||
return pThat;
|
||||
}
|
||||
}
|
273
Source/IO/Adapters/AuIOAdapterAsyncDelegators.cpp
Normal file
273
Source/IO/Adapters/AuIOAdapterAsyncDelegators.cpp
Normal file
@ -0,0 +1,273 @@
|
||||
/***
|
||||
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||
|
||||
File: AuIOAdapterAsyncDelegators.cpp
|
||||
Date: 2024-2-24
|
||||
Author: Reece
|
||||
***/
|
||||
#include <Source/RuntimeInternal.hpp>
|
||||
#include "AuIOAdapterAsyncDelegators.hpp"
|
||||
#include <Source/IO/Async/AuIOThreadPool.hpp>
|
||||
#include <Source/IO/Async/AuIOAPCLessWaitable.hpp>
|
||||
|
||||
namespace Aurora::IO::Adapters
|
||||
{
|
||||
struct AsyncReaderWriter : IAsyncTransaction,
|
||||
Async::APCLessWaitable,
|
||||
AuAsync::IWorkItemHandler
|
||||
{
|
||||
AuSPtr<IStreamReader> pStreamReader;
|
||||
AuSPtr<ISeekingReader> pStreamReaderEx;
|
||||
AuSPtr<IStreamWriter> pStreamWriter;
|
||||
AuSPtr<ISeekingWriter> pStreamWriterEx;
|
||||
AuSPtr<Memory::MemoryViewRead> pReadView;
|
||||
AuSPtr<Memory::MemoryViewWrite> pWriteView;
|
||||
AuWorkerPId workers;
|
||||
|
||||
void DispatchFrame(ProcessInfo &info) override
|
||||
{
|
||||
try
|
||||
{
|
||||
if (this->pReadView)
|
||||
{
|
||||
if (this->pStreamWriter)
|
||||
{
|
||||
this->eStreamError = this->pStreamWriter->Write(AuMemoryViewStreamRead(*this->pReadView, this->uLastLength));
|
||||
}
|
||||
else if (this->pStreamWriterEx)
|
||||
{
|
||||
this->eStreamError = this->pStreamWriterEx->ArbitraryWrite(this->uBaseOffset + this->uLastOffset, AuMemoryViewStreamRead(*this->pReadView, this->uLastLength));
|
||||
}
|
||||
}
|
||||
else if (this->pWriteView)
|
||||
{
|
||||
if (this->pStreamReader)
|
||||
{
|
||||
this->eStreamError = this->pStreamReader->Read(AuMemoryViewStreamWrite(*this->pWriteView, this->uLastLength));
|
||||
}
|
||||
else if (this->pStreamReaderEx)
|
||||
{
|
||||
this->eStreamError = this->pStreamReaderEx->ArbitraryRead(this->uBaseOffset + this->uLastOffset, AuMemoryViewStreamWrite(*this->pWriteView, this->uLastLength));
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
this->eStreamError = EStreamError::eErrorGenericFault;
|
||||
}
|
||||
|
||||
this->bInProgress = false;
|
||||
this->SignalComplete();
|
||||
}
|
||||
|
||||
void OnFailure()
|
||||
{
|
||||
this->bInProgress = false;
|
||||
this->Reset();
|
||||
}
|
||||
|
||||
bool StartRead(AuUInt64 uOffset, const AuSPtr<Memory::MemoryViewWrite> &memoryView) override
|
||||
{
|
||||
if (!memoryView)
|
||||
{
|
||||
SysPushErrorArg();
|
||||
return false;
|
||||
}
|
||||
|
||||
if (this->bInProgress)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!this->pStreamReaderEx && !this->pStreamReader)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
AuResetMember(this->pReadView);
|
||||
AuResetMember(this->pWriteView);
|
||||
this->bInProgress = true;
|
||||
this->uLastOffset = uOffset;
|
||||
this->pWriteView = memoryView;
|
||||
|
||||
auto pThat = AuAsync::NewWorkItem(this->workers, AuStaticCast<AsyncReaderWriter>(this->SharedFromThis()));
|
||||
if (!pThat)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return pThat->Dispatch();
|
||||
}
|
||||
|
||||
bool StartWrite(AuUInt64 uOffset, const AuSPtr<Memory::MemoryViewRead> &memoryView) override
|
||||
{
|
||||
if (!memoryView)
|
||||
{
|
||||
SysPushErrorArg();
|
||||
return false;
|
||||
}
|
||||
|
||||
if (this->bInProgress)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!this->pStreamReaderEx && !this->pStreamReader)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
AuResetMember(this->pReadView);
|
||||
AuResetMember(this->pWriteView);
|
||||
this->bInProgress = true;
|
||||
this->uLastOffset = uOffset;
|
||||
this->pReadView = memoryView;
|
||||
|
||||
auto pThat = AuAsync::NewWorkItem(this->workers, AuStaticCast<AsyncReaderWriter>(this->SharedFromThis()));
|
||||
if (!pThat)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return pThat->Dispatch();
|
||||
}
|
||||
|
||||
AuUInt32 GetLastPacketLength() override
|
||||
{
|
||||
return this->uLastLength;
|
||||
}
|
||||
|
||||
void OnOriginThreadComplete() override
|
||||
{
|
||||
if (this->pSubscriber)
|
||||
{
|
||||
this->pSubscriber->OnAsyncFileOpFinished(this->uBaseOffset + this->uLastOffset, this->GetLastPacketLength());
|
||||
}
|
||||
|
||||
AuResetMember(this->pReadView);
|
||||
AuResetMember(this->pWriteView);
|
||||
}
|
||||
|
||||
bool Complete() override
|
||||
{
|
||||
Async::APCLessWaitable::CheckLocal();
|
||||
return Async::APCLessWaitable::HasBeenSignaled();
|
||||
}
|
||||
|
||||
bool HasFailed() override
|
||||
{
|
||||
return Async::APCLessWaitable::HasBeenSignaled() && this->eStreamError != EStreamError::eErrorNone;
|
||||
}
|
||||
|
||||
bool HasCompleted() override
|
||||
{
|
||||
return Async::APCLessWaitable::HasBeenSignaled() && this->eStreamError == EStreamError::eErrorNone;
|
||||
}
|
||||
|
||||
AuUInt GetOSErrorCode() override
|
||||
{
|
||||
return this->HasFailed() ? AuUInt(this->eStreamError) : 0;
|
||||
}
|
||||
|
||||
void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &pSubscriber) override
|
||||
{
|
||||
this->pSubscriber = pSubscriber;
|
||||
}
|
||||
|
||||
bool Wait(AuUInt32 uTimeout) override
|
||||
{
|
||||
return NewLoopSource()->WaitOn(uTimeout);
|
||||
}
|
||||
|
||||
AuSPtr<IO::Loop::ILoopSource> NewLoopSource() override
|
||||
{
|
||||
return Async::APCLessWaitable::GetLoopSource();
|
||||
}
|
||||
|
||||
void Reset() override
|
||||
{
|
||||
if (this->bInProgress)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
AuResetMember(this->pReadView);
|
||||
AuResetMember(this->pWriteView);
|
||||
Async::APCLessWaitable::Reset();
|
||||
}
|
||||
|
||||
void SetBaseOffset(AuUInt64 uBaseOffset) override
|
||||
{
|
||||
this->uBaseOffset = uBaseOffset;
|
||||
}
|
||||
|
||||
bool TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup) override
|
||||
{
|
||||
return Async::APCLessWaitable::TryAttachToCompletionGroup(pCompletionGroup);
|
||||
}
|
||||
|
||||
CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle() override
|
||||
{
|
||||
return this;
|
||||
}
|
||||
|
||||
AuSPtr<CompletionGroup::ICompletionGroup> GetCompletionGroup() override
|
||||
{
|
||||
return Async::APCLessWaitable::GetCompletionGroup();
|
||||
}
|
||||
|
||||
private:
|
||||
AuMutex mutex;
|
||||
AuSPtr<IAsyncFinishedSubscriber> pSubscriber;
|
||||
AuUInt64 uBaseOffset {};
|
||||
AuUInt64 uLastOffset {};
|
||||
AuUInt uLastLength {};
|
||||
bool bInProgress {};
|
||||
EStreamError eStreamError = EStreamError::eErrorNone;
|
||||
};
|
||||
|
||||
static AuWorkerPId GetAuxWorkerPool()
|
||||
{
|
||||
return Async::GetAuxWorkerPoolAndRegister();
|
||||
}
|
||||
|
||||
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamReader(const AuSPtr<IStreamReader> &pStreamReader, AuOptional<AuWorkerPId> workers)
|
||||
{
|
||||
SysCheckArgNotNull(pStreamReader, {});
|
||||
auto pObject = AuMakeShared<AsyncReaderWriter>();
|
||||
SysCheckNotNullMemory(pObject, {});
|
||||
pObject->pStreamReader = pStreamReader;
|
||||
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
|
||||
return pObject;
|
||||
}
|
||||
|
||||
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamSeekingReader(const AuSPtr<ISeekingReader> &pStreamReader, AuOptional<AuWorkerPId> workers)
|
||||
{
|
||||
SysCheckArgNotNull(pStreamReader, {});
|
||||
auto pObject = AuMakeShared<AsyncReaderWriter>();
|
||||
SysCheckNotNullMemory(pObject, {});
|
||||
pObject->pStreamReaderEx = pStreamReader;
|
||||
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
|
||||
return pObject;
|
||||
}
|
||||
|
||||
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamWriter(const AuSPtr<IStreamWriter> &pStreamWriter, AuOptional<AuWorkerPId> workers)
|
||||
{
|
||||
SysCheckArgNotNull(pStreamWriter, {});
|
||||
auto pObject = AuMakeShared<AsyncReaderWriter>();
|
||||
SysCheckNotNullMemory(pObject, {});
|
||||
pObject->pStreamWriter = pStreamWriter;
|
||||
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
|
||||
return pObject;
|
||||
}
|
||||
|
||||
AUKN_SYM AuSPtr<IAsyncTransaction> NewAsyncTransactionFromStreamSeekingWriter(const AuSPtr<ISeekingWriter> &pStreamWriter, AuOptional<Aurora::Async::WorkerPId_t> workers)
|
||||
{
|
||||
SysCheckArgNotNull(pStreamWriter, {});
|
||||
auto pObject = AuMakeShared<AsyncReaderWriter>();
|
||||
SysCheckNotNullMemory(pObject, {});
|
||||
pObject->pStreamWriterEx = pStreamWriter;
|
||||
pObject->workers = workers.ValueOr(GetAuxWorkerPool());
|
||||
return pObject;
|
||||
}
|
||||
}
|
12
Source/IO/Adapters/AuIOAdapterAsyncDelegators.hpp
Normal file
12
Source/IO/Adapters/AuIOAdapterAsyncDelegators.hpp
Normal file
@ -0,0 +1,12 @@
|
||||
/***
|
||||
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||
|
||||
File: AuIOAdapterAsyncDelegators.hpp
|
||||
Date: 2024-2-24
|
||||
Author: Reece
|
||||
***/
|
||||
#pragma once
|
||||
|
||||
namespace Aurora::IO::Adapters
|
||||
{
|
||||
}
|
130
Source/IO/Async/AuIOAPCLessWaitable.cpp
Normal file
130
Source/IO/Async/AuIOAPCLessWaitable.cpp
Normal file
@ -0,0 +1,130 @@
|
||||
/***
|
||||
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||
|
||||
File: AuIOAPCLessWaitable.cpp
|
||||
Date: 2024-2-25
|
||||
Author: Reece
|
||||
***/
|
||||
#include <Source/RuntimeInternal.hpp>
|
||||
#include "AuIOAPCLessWaitable.hpp"
|
||||
|
||||
namespace Aurora::IO::Async
|
||||
{
|
||||
void APCLessEvent::OnFinishSleep()
|
||||
{
|
||||
if (auto pThat = AuTryLockMemoryType(this->wpParent))
|
||||
{
|
||||
pThat->CheckLocal();
|
||||
}
|
||||
}
|
||||
|
||||
APCLessWaitable::APCLessWaitable()
|
||||
{
|
||||
this->threadId = AuThreads::GetThreadId();
|
||||
}
|
||||
|
||||
AuSPtr<Loop::ILoopSource> APCLessWaitable::GetLoopSource()
|
||||
{
|
||||
AU_LOCK_GUARD(this->mutex);
|
||||
|
||||
if (this->pCompletionGroup)
|
||||
{
|
||||
return this->pCompletionGroup->ToAnyLoopSource();
|
||||
}
|
||||
else if (this->pEvent)
|
||||
{
|
||||
return this->pEvent;
|
||||
}
|
||||
else
|
||||
{
|
||||
this->pEvent = AuMakeShared<APCLessEvent>();
|
||||
SysCheckNotNullMemory(this->pEvent, {});
|
||||
SysCheckReturn(this->pEvent->TryInit(false, false, true), {});
|
||||
this->pEvent->wpParent = this->SharedFromThis();
|
||||
return this->pEvent;
|
||||
}
|
||||
}
|
||||
|
||||
bool APCLessWaitable::HasCompletedForGCWI()
|
||||
{
|
||||
this->CheckLocal();
|
||||
return this->bHasFinished;
|
||||
}
|
||||
|
||||
void APCLessWaitable::CleanupForGCWI()
|
||||
{
|
||||
AuResetMember(this->pCompletionGroup);
|
||||
}
|
||||
|
||||
bool APCLessWaitable::HasBeenSignaled()
|
||||
{
|
||||
return this->bHasFinished;
|
||||
}
|
||||
|
||||
void APCLessWaitable::SignalComplete()
|
||||
{
|
||||
AU_LOCK_GUARD(this->mutex);
|
||||
|
||||
this->bHasFinished = true;
|
||||
|
||||
if (this->pEvent)
|
||||
{
|
||||
this->pEvent->Set();
|
||||
}
|
||||
|
||||
if (this->pCompletionGroup)
|
||||
{
|
||||
if (auto pOtherEvent = this->pCompletionGroup->GetTriggerLoopSource())
|
||||
{
|
||||
pOtherEvent->Set();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void APCLessWaitable::Reset()
|
||||
{
|
||||
if (this->pEvent)
|
||||
{
|
||||
this->pEvent->Reset();
|
||||
}
|
||||
|
||||
this->bHasFinished = false;
|
||||
}
|
||||
|
||||
void APCLessWaitable::CheckLocal()
|
||||
{
|
||||
if (!this->bHasFinished)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (this->threadId == AuThreads::GetThreadId())
|
||||
{
|
||||
this->OnOriginThreadComplete();
|
||||
}
|
||||
}
|
||||
|
||||
bool APCLessWaitable::TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup)
|
||||
{
|
||||
AU_LOCK_GUARD(this->mutex);
|
||||
|
||||
if (!pCompletionGroup)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!this->pCompletionGroup)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
this->pCompletionGroup = pCompletionGroup;
|
||||
return true;
|
||||
}
|
||||
|
||||
AuSPtr<CompletionGroup::ICompletionGroup> APCLessWaitable::GetCompletionGroup()
|
||||
{
|
||||
AU_LOCK_GUARD(this->mutex);
|
||||
return this->pCompletionGroup;
|
||||
}
|
||||
}
|
53
Source/IO/Async/AuIOAPCLessWaitable.hpp
Normal file
53
Source/IO/Async/AuIOAPCLessWaitable.hpp
Normal file
@ -0,0 +1,53 @@
|
||||
/***
|
||||
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||
|
||||
File: AuIOAPCLessWaitable.hpp
|
||||
Date: 2024-2-25
|
||||
Author: Reece
|
||||
***/
|
||||
#pragma once
|
||||
|
||||
#include <Source/IO/Loop/LSEvent.hpp>
|
||||
|
||||
namespace Aurora::IO::Async
|
||||
{
|
||||
struct APCLessWaitable;
|
||||
|
||||
struct APCLessEvent : Loop::LSEvent
|
||||
{
|
||||
AuWPtr<APCLessWaitable> wpParent;
|
||||
|
||||
void OnFinishSleep() override;
|
||||
};
|
||||
|
||||
struct APCLessWaitable :
|
||||
CompletionGroup::ICompletionGroupWorkItem,
|
||||
AuEnableSharedFromThis<APCLessWaitable>
|
||||
{
|
||||
APCLessWaitable();
|
||||
|
||||
AuSPtr<Loop::ILoopSource> GetLoopSource();
|
||||
void SignalComplete();
|
||||
void CheckLocal();
|
||||
|
||||
bool TryAttachToCompletionGroup(const AuSPtr<CompletionGroup::ICompletionGroup> &pCompletionGroup);
|
||||
AuSPtr<CompletionGroup::ICompletionGroup> GetCompletionGroup();
|
||||
|
||||
void Reset();
|
||||
|
||||
virtual void OnOriginThreadComplete() = 0;
|
||||
|
||||
virtual bool HasCompletedForGCWI() override;
|
||||
virtual void CleanupForGCWI() override;
|
||||
|
||||
bool HasBeenSignaled();
|
||||
|
||||
private:
|
||||
AuUInt threadId {};
|
||||
AuSPtr<CompletionGroup::ICompletionGroup> pCompletionGroup;
|
||||
AuMutex mutex;
|
||||
AuSPtr<APCLessEvent> pEvent;
|
||||
bool bHasCalledBack {};
|
||||
volatile bool bHasFinished {};
|
||||
};
|
||||
}
|
110
Source/IO/Async/AuIOThreadPool.cpp
Normal file
110
Source/IO/Async/AuIOThreadPool.cpp
Normal file
@ -0,0 +1,110 @@
|
||||
/***
|
||||
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||
|
||||
File: AuIOThreadPool.cpp
|
||||
Date: 2024-2-25
|
||||
Author: Reece
|
||||
***/
|
||||
#include <Source/RuntimeInternal.hpp>
|
||||
#include "AuIOThreadPool.hpp"
|
||||
|
||||
namespace Aurora::IO::Async
|
||||
{
|
||||
static AuMutex gMutex;
|
||||
static AuAsync::WorkerPId_t gDefaultGroup;
|
||||
static AuSPtr<AuAsync::IThreadPool> gThreadPool;
|
||||
static AuList<AuWPtr<AuAsync::IThreadPool>> gWorkersRegistered;
|
||||
static AuUInt gJobRunners = 4;
|
||||
|
||||
AUKN_SYM void UseSpecifiedWorkerGroup(AuAsync::WorkerPId_t worker)
|
||||
{
|
||||
AU_LOCK_GUARD(gMutex);
|
||||
gDefaultGroup = worker;
|
||||
}
|
||||
|
||||
static AuAsync::WorkerPId_t SpawnDefaultGroup()
|
||||
{
|
||||
AuAsync::WorkerPId_t ret;
|
||||
|
||||
// TODO: gJobRunners = runtime config?
|
||||
|
||||
gThreadPool = AuAsync::NewThreadPool();
|
||||
if (!gThreadPool)
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
ret = { gThreadPool, 0, AuAsync::kThreadIdAny };
|
||||
|
||||
for (AU_ITERATE_N(i, gJobRunners))
|
||||
{
|
||||
AuAsync::WorkerPId_t copy = ret;
|
||||
copy.second = i;
|
||||
|
||||
if (!gThreadPool->Spawn(copy))
|
||||
{
|
||||
if (i > 1)
|
||||
{
|
||||
return ret;
|
||||
}
|
||||
else
|
||||
{
|
||||
return {};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static AuAsync::WorkerPId_t GetAuxWorkerPool()
|
||||
{
|
||||
if (gDefaultGroup)
|
||||
{
|
||||
return gDefaultGroup;
|
||||
}
|
||||
else
|
||||
{
|
||||
return gDefaultGroup = SpawnDefaultGroup();
|
||||
}
|
||||
}
|
||||
|
||||
AuAsync::WorkerPId_t GetAuxWorkerPoolAndRegister()
|
||||
{
|
||||
AU_LOCK_GUARD(gMutex);
|
||||
|
||||
if (auto worker = GetAuxWorkerPool())
|
||||
{
|
||||
if (auto selfWorker = AuAsync::GetCurrentWorkerPId())
|
||||
{
|
||||
if (gWorkersRegistered.size() > 10)
|
||||
{
|
||||
AuRemoveAllIf(gWorkersRegistered, [](AuWPtr<AuAsync::IThreadPool> wpThat)
|
||||
{
|
||||
return wpThat.expired();
|
||||
});
|
||||
}
|
||||
|
||||
for (const auto &wpThat : gWorkersRegistered)
|
||||
{
|
||||
if (auto pThat = AuTryLockMemoryType(wpThat))
|
||||
{
|
||||
if (pThat == selfWorker.GetPool())
|
||||
{
|
||||
return worker;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
gWorkersRegistered.push_back(selfWorker.GetPool());
|
||||
selfWorker.GetPool()->AddDependency(worker.GetPool());
|
||||
}
|
||||
|
||||
return worker;
|
||||
}
|
||||
else
|
||||
{
|
||||
SysPanic("No async threadpool");
|
||||
}
|
||||
}
|
||||
}
|
13
Source/IO/Async/AuIOThreadPool.hpp
Normal file
13
Source/IO/Async/AuIOThreadPool.hpp
Normal file
@ -0,0 +1,13 @@
|
||||
/***
|
||||
Copyright (C) 2024 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||
|
||||
File: AuIOThreadPool.hpp
|
||||
Date: 2024-2-25
|
||||
Author: Reece
|
||||
***/
|
||||
#pragma once
|
||||
|
||||
namespace Aurora::IO::Async
|
||||
{
|
||||
AuAsync::WorkerPId_t GetAuxWorkerPoolAndRegister();
|
||||
}
|
@ -973,7 +973,7 @@ namespace Aurora::IO
|
||||
return processor;
|
||||
}
|
||||
|
||||
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessorOnThread(bool tickOnly, Async::WorkerPId_t id)
|
||||
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessorOnThread(bool tickOnly, AuAsync::WorkerPId_t id)
|
||||
{
|
||||
if (!id.GetPool())
|
||||
{
|
||||
|
@ -175,7 +175,7 @@ namespace Aurora::IO::CompletionGroup
|
||||
}
|
||||
}
|
||||
|
||||
void CompletionGroup::AddWorkItem(AuSPtr<ICompletionGroupWorkItem> pCompletable)
|
||||
void CompletionGroup::AddWorkItem(const AuSPtr<ICompletionGroupWorkItem> &pCompletable)
|
||||
{
|
||||
SysAssert(!this->bTerminated, "Completion group already terminated");
|
||||
AU_LOCK_GUARD(this->mutex);
|
||||
@ -248,7 +248,7 @@ namespace Aurora::IO::CompletionGroup
|
||||
this->DoIOTick(true);
|
||||
}
|
||||
|
||||
AuSPtr<Async::IWorkItem> CompletionGroup::OnCompletion()
|
||||
AuSPtr<AuAsync::IWorkItem> CompletionGroup::OnCompletion()
|
||||
{
|
||||
AU_LOCK_GUARD(this->cs);
|
||||
|
||||
@ -299,7 +299,7 @@ namespace Aurora::IO::CompletionGroup
|
||||
return pRet;
|
||||
}
|
||||
|
||||
AuSPtr<Async::IWorkItem> CompletionGroup::OnSingleCompletion()
|
||||
AuSPtr<AuAsync::IWorkItem> CompletionGroup::OnSingleCompletion()
|
||||
{
|
||||
AU_LOCK_GUARD(this->cs);
|
||||
|
||||
|
@ -21,8 +21,8 @@ namespace Aurora::IO::CompletionGroup
|
||||
AuSPtr<Loop::ILoopSource> ToAndLoopSource() override;
|
||||
AuSPtr<Loop::ILoopSource> ToAnyLoopSource() override;
|
||||
|
||||
AuSPtr<Async::IWorkItem> OnCompletion() override;
|
||||
AuSPtr<Async::IWorkItem> OnSingleCompletion() override;
|
||||
AuSPtr<AuAsync::IWorkItem> OnCompletion() override;
|
||||
AuSPtr<AuAsync::IWorkItem> OnSingleCompletion() override;
|
||||
|
||||
bool WaitForAnyMS(AuUInt32 uTimeoutOrZeroMS) override;
|
||||
bool WaitForAllMS(AuUInt32 uTimeoutOrZeroMS) override;
|
||||
@ -39,7 +39,7 @@ namespace Aurora::IO::CompletionGroup
|
||||
void ResetMemoryPins();
|
||||
bool HasItemsActive();
|
||||
|
||||
void AddWorkItem(AuSPtr<ICompletionGroupWorkItem> pCompletable) override;
|
||||
void AddWorkItem(const AuSPtr<ICompletionGroupWorkItem> &pCompletable) override;
|
||||
|
||||
void AddCallbackTick(const AuSPtr<IIOProcessorManualInvoker> &pCallbackInvoker, bool bAny);
|
||||
|
||||
@ -53,8 +53,8 @@ namespace Aurora::IO::CompletionGroup
|
||||
CompletionQuantumEventProvider andPlsDontAllocateFdIfUntouchedEvent;
|
||||
AuList<AuSPtr<ICompletionGroupWorkItem>> workItems;
|
||||
AuSPtr<ICompletionGroupHooks> pCallbacks;
|
||||
AuSPtr<Async::IWorkItem> pAnyBarrier;
|
||||
AuSPtr<Async::IWorkItem> pAndBarrier;
|
||||
AuSPtr<AuAsync::IWorkItem> pAnyBarrier;
|
||||
AuSPtr<AuAsync::IWorkItem> pAndBarrier;
|
||||
AuList<AuPair<AuSPtr<IIOProcessorManualInvoker>, bool>> callbackTicks;
|
||||
AuUInt32 uAdded {};
|
||||
AuUInt32 uTriggered {};
|
||||
|
@ -12,16 +12,16 @@
|
||||
|
||||
namespace Aurora::IO::CompletionGroup
|
||||
{
|
||||
CompletionGroupAndedIOWorkItem::CompletionGroupAndedIOWorkItem(Async::IThreadPoolInternal *owner,
|
||||
CompletionGroupAndedIOWorkItem::CompletionGroupAndedIOWorkItem(AuAsync::IThreadPoolInternal *owner,
|
||||
const AuWorkerID &worker,
|
||||
AuSPtr<CompletionGroup> pParent) :
|
||||
Async::WorkItem(owner, worker, nullptr),
|
||||
AuAsync::WorkItem(owner, worker, nullptr),
|
||||
pParent(pParent)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
void CompletionGroupAndedIOWorkItem::DispatchTask(Async::IWorkItemHandler::ProcessInfo &info)
|
||||
void CompletionGroupAndedIOWorkItem::DispatchTask(AuAsync::IWorkItemHandler::ProcessInfo &info)
|
||||
{
|
||||
this->pParent->DoIOTick(false);
|
||||
|
||||
|
@ -11,13 +11,13 @@
|
||||
|
||||
namespace Aurora::IO::CompletionGroup
|
||||
{
|
||||
struct CompletionGroupAndedIOWorkItem : Async::WorkItem
|
||||
struct CompletionGroupAndedIOWorkItem : AuAsync::WorkItem
|
||||
{
|
||||
CompletionGroupAndedIOWorkItem(Async::IThreadPoolInternal *owner,
|
||||
CompletionGroupAndedIOWorkItem(AuAsync::IThreadPoolInternal *owner,
|
||||
const AuWorkerID &worker,
|
||||
AuSPtr<CompletionGroup> pParent);
|
||||
|
||||
void DispatchTask(Async::IWorkItemHandler::ProcessInfo &info) override;
|
||||
void DispatchTask(AuAsync::IWorkItemHandler::ProcessInfo &info) override;
|
||||
AuSPtr<CompletionGroup> pParent;
|
||||
};
|
||||
|
||||
|
@ -42,11 +42,11 @@ namespace Aurora::IO::Loop
|
||||
return AuConsole::StdInBufferLoopSource();
|
||||
}
|
||||
|
||||
AUKN_SYM AuSPtr<ILoopSource> NewLSAsync(Async::WorkerPId_t workerPid)
|
||||
AUKN_SYM AuSPtr<ILoopSource> NewLSAsync(AuAsync::WorkerPId_t workerPid)
|
||||
{
|
||||
if (!workerPid)
|
||||
{
|
||||
return Async::GetAsyncApp()->WorkerToLoopSource(workerPid);
|
||||
return AuAsync::GetAsyncApp()->WorkerToLoopSource(workerPid);
|
||||
}
|
||||
|
||||
return workerPid.GetPool()->WorkerToLoopSource(workerPid);
|
||||
|
Loading…
Reference in New Issue
Block a user