diff --git a/Source/Async/WorkItem.cpp b/Source/Async/WorkItem.cpp index 7123e8e6..1be045ee 100644 --- a/Source/Async/WorkItem.cpp +++ b/Source/Async/WorkItem.cpp @@ -66,7 +66,7 @@ namespace Aurora::Async return AU_SHARED_FROM_THIS; } - if (!AuTryInsert(this->waitOn_, workItem)) + if (!AuTryInsert(this->waitOn_, AuConstReference(workItem))) { AuTryRemove(dependency->waiters_, AuSharedFromThis()); Fail(); @@ -100,7 +100,7 @@ namespace Aurora::Async return false; } - if (!AuTryInsert(this->waitOn_, workItem)) + if (!AuTryInsert(this->waitOn_, AuConstReference(workItem))) { AuTryRemove(dependency->waiters_, AuSharedFromThis()); return false; diff --git a/Source/IO/AuIOPipeProcessor.cpp b/Source/IO/AuIOPipeProcessor.cpp index 447a1aed..839b6465 100644 --- a/Source/IO/AuIOPipeProcessor.cpp +++ b/Source/IO/AuIOPipeProcessor.cpp @@ -134,6 +134,10 @@ namespace Aurora::IO return false; } } + else + { + return false; + } } else { diff --git a/Source/IO/FS/Async.Linux.cpp b/Source/IO/FS/Async.Linux.cpp index 98a8cc7b..4fc7997b 100644 --- a/Source/IO/FS/Async.Linux.cpp +++ b/Source/IO/FS/Async.Linux.cpp @@ -268,8 +268,7 @@ namespace Aurora::IO::FS return false; } this->pHandle_ = handle; - this->loopSource_ = AuMakeShared(AuSharedFromThis()); - return bool(this->loopSource_); + return true;//bool(this->loopSource_); } void LinuxAsyncFileTransaction::SetBaseOffset(AuUInt64 uBaseOffset) @@ -285,6 +284,11 @@ namespace Aurora::IO::FS return false; } + if (!this->pHandle_) + { + return false; + } + auto iOptSafe = this->pHandle_->GetOSReadHandleSafe(); if (!iOptSafe) { @@ -303,13 +307,21 @@ namespace Aurora::IO::FS this->bTxFinished_ = false; this->lastFinishedStat_ = 0; - if (!this->loopSource_) + if (!this->pCompletionGroup_ && !this->loopSource_) + { + this->loopSource_ = AuMakeShared(AuSharedFromThis()); + } + + if (!this->pCompletionGroup_ && !this->loopSource_) { SysPushErrorUninitialized(); return false; } - this->loopSource_->Reset(); + if (this->loopSource_) + { + this->loopSource_->Reset(); + } this->lastAbstractOffset_ = offset; @@ -334,7 +346,7 @@ namespace Aurora::IO::FS offset += this->uBaseOffset; - if (!UNIX::LinuxOverlappedSubmitRead(fd, offset, this, this->loopSource_.get(), bool(IPC_PIPE))) + if (!UNIX::LinuxOverlappedSubmitRead(fd, offset, this, this->pCompletionGroup_ ? this->pAltEvent : this->loopSource_.get(), bool(IPC_PIPE))) { LIOS_SendProcess(0, true, errno); return true; @@ -358,6 +370,11 @@ namespace Aurora::IO::FS return false; } + if (!this->pHandle_) + { + return false; + } + auto iOptSafe = this->pHandle_->GetOSWriteHandleSafe(); if (!iOptSafe) { @@ -376,13 +393,21 @@ namespace Aurora::IO::FS this->hasError_ = false; this->lastFinishedStat_ = 0; - if (!this->loopSource_) + if (!this->pCompletionGroup_ && !this->loopSource_) + { + this->loopSource_ = AuMakeShared(AuSharedFromThis()); + } + + if (!this->pCompletionGroup_ && !this->loopSource_) { SysPushErrorUninitialized(); return false; } - this->loopSource_->Reset(); + if (this->loopSource_) + { + this->loopSource_->Reset(); + } this->lastAbstractOffset_ = offset; @@ -398,7 +423,7 @@ namespace Aurora::IO::FS offset += this->uBaseOffset; - if (!UNIX::LinuxOverlappedSubmitWrite(fd, offset, this, this->loopSource_.get())) + if (!UNIX::LinuxOverlappedSubmitWrite(fd, offset, this, this->pCompletionGroup_ ? this->pAltEvent : this->loopSource_.get())) { LIOS_SendProcess(0, true, errno); return false; @@ -424,6 +449,44 @@ namespace Aurora::IO::FS } } + bool LinuxAsyncFileTransaction::TryAttachToCompletionGroup(const AuSPtr &pCompletionGroup) + { + if (!pCompletionGroup) + { + return false; + } + + auto pLoopSource = pCompletionGroup->GetTriggerLoopSource(); + if (!pLoopSource) + { + return false; + } + + pCompletionGroup->AddWorkItem(this->SharedFromThis()); + this->pAltEvent = AuStaticCast(pLoopSource).get(); + return true; + } + + CompletionGroup::ICompletionGroupWorkHandle *LinuxAsyncFileTransaction::ToCompletionGroupHandle() + { + return this; + } + + AuSPtr LinuxAsyncFileTransaction::GetCompletionGroup() + { + return this->pCompletionGroup_; + } + + bool LinuxAsyncFileTransaction::HasCompletedForGCWI() + { + return this->HasCompleted(); + } + + void LinuxAsyncFileTransaction::CleanupForGCWI() + { + AuResetMember(this->pCompletionGroup_); + } + void LinuxAsyncFileTransaction::LIOS_Process(AuUInt32 read, bool failure, int err, bool mark) { this->lastFinishedStat_ = failure ? 0 : read; @@ -518,7 +581,14 @@ namespace Aurora::IO::FS AuSPtr LinuxAsyncFileTransaction::NewLoopSource() { - return AuStaticCast(AuStaticCast(this->loopSource_)); + if (this->pCompletionGroup_) + { + return this->pCompletionGroup_->ToAnyLoopSource(); + } + else + { + return AuStaticCast(AuStaticCast(this->loopSource_)); + } } AUKN_SYM IAsyncFileStream *OpenAsyncNew(const AuString &path, EFileOpenMode openMode, bool bDirectIO, EFileAdvisoryLockLevel lock) diff --git a/Source/IO/FS/Async.Linux.hpp b/Source/IO/FS/Async.Linux.hpp index 2cf598b4..afb3c7fc 100644 --- a/Source/IO/FS/Async.Linux.hpp +++ b/Source/IO/FS/Async.Linux.hpp @@ -44,7 +44,10 @@ namespace Aurora::IO::FS }; - struct LinuxAsyncFileTransaction : IAsyncTransaction, AuEnableSharedFromThis, Aurora::IO::UNIX::ASubmittable + struct LinuxAsyncFileTransaction : IAsyncTransaction, + CompletionGroup::ICompletionGroupWorkItem, + AuEnableSharedFromThis, + Aurora::IO::UNIX::ASubmittable { LinuxAsyncFileTransaction(AuSPtr pProcessBlock); ~LinuxAsyncFileTransaction(); @@ -75,7 +78,16 @@ namespace Aurora::IO::FS void SetBaseOffset(AuUInt64 uBaseOffset) override; - virtual void LIOS_Process(AuUInt32 read, bool failure, int err, bool mark) override; + void LIOS_Process(AuUInt32 read, bool failure, int err, bool mark) override; + + bool TryAttachToCompletionGroup(const AuSPtr &pCompletionGroup) override; + + CompletionGroup::ICompletionGroupWorkHandle *ToCompletionGroupHandle() override; + + AuSPtr GetCompletionGroup() override; + + bool HasCompletedForGCWI() override; + void CleanupForGCWI() override; AuSPtr pProcessBlock_; private: @@ -85,9 +97,11 @@ namespace Aurora::IO::FS AuUInt64 uBaseOffset {}; bool latch_ {}; bool bTxFinished_ {}; + AuSPtr pCompletionGroup_; AuSPtr sub_; AuSPtr loopSource_; int error_ {}; bool hasError_ {}; + Loop::ILSEvent *pAltEvent {}; }; } \ No newline at end of file diff --git a/Source/IO/FS/FSPlatformDevices.Linux.cpp b/Source/IO/FS/FSPlatformDevices.Linux.cpp new file mode 100644 index 00000000..9ef8d464 --- /dev/null +++ b/Source/IO/FS/FSPlatformDevices.Linux.cpp @@ -0,0 +1,78 @@ +/*** + Copyright (C) 2023 Jamie Reece Wilson (a/k/a "Reece"). All rights reserved. + + File: FSPlatformDevices.Linux.cpp + Date: 2023-08-11 + Date: 2024-02-XX + Author: Reece +***/ +// TODO: recover work in progress work dated last year +#include +#include "FS.hpp" +#include "FSPlatformDevices.hpp" + +namespace Aurora::IO::FS +{ + AUKN_SYM AuString GetRootFromPath(const AuString &path) + { + return {}; + } + + AUKN_SYM AuResult GetDeviceFromPath(const AuString &path) + { + return {}; + } + + AUKN_SYM AuResult GetLogicalMountFromPath(const AuString &fileOrDirPath) + { + return {}; + } + + AUKN_SYM AuResult GetDeviceFromRoot(const AuString &root) + { + return {}; + } + + AUKN_SYM AuUInt32 GetLogicalSectorSizeFromPath(const AuString &path) + { + return 0; + } + + AUKN_SYM LogicalUsedResponse GetLogicalUsedFromLogicalDevice(const AuString &logicalMountPath) + { + return {}; + } + + AUKN_SYM AuUInt64 GetDeviceSizeInBytes(const AuString &physicalDevicePath) + { + return 0; + } + + AUKN_SYM AuUInt32 GetPhysicalSectorSizeFromPath(const AuString &path) + { + return 0; + } + + AuList SysGetFSDevices() + { + AuList devices; + + return devices; + } + + AUKN_SYM AuUInt32 GetPerformanceBufferSizeFromPath(const AuString &path) + { + return 4096; + } + + AUKN_SYM LogicalUsedResponse GetLogicalUsedFromPath(const AuString &path) + { + auto rootPath = GetLogicalMountFromPath(path); + if (!rootPath) + { + return {}; + } + + return GetLogicalUsedFromLogicalDevice(rootPath.GetResult()); + } +} \ No newline at end of file diff --git a/Source/IO/Loop/LoopQueue.Linux.cpp b/Source/IO/Loop/LoopQueue.Linux.cpp index f3dd114a..203ea015 100644 --- a/Source/IO/Loop/LoopQueue.Linux.cpp +++ b/Source/IO/Loop/LoopQueue.Linux.cpp @@ -229,7 +229,7 @@ namespace Aurora::IO::Loop src->timeoutAbs = (AuUInt64)ms + AuTime::SteadyClockMS(); } - if (!AuTryInsert(this->sources_, src)) + if (!AuTryInsert(this->sources_, AuConstReference(src))) { pWaitable->Unlock(); return false; diff --git a/Source/IO/Net/AuNetSrvWorkers.cpp b/Source/IO/Net/AuNetSrvWorkers.cpp index c82f0fe7..ae044fca 100644 --- a/Source/IO/Net/AuNetSrvWorkers.cpp +++ b/Source/IO/Net/AuNetSrvWorkers.cpp @@ -28,7 +28,7 @@ namespace Aurora::IO::Net return {}; } - if (!AuTryInsert(this->workerPool_, worker)) + if (!AuTryInsert(this->workerPool_, AuConstReference(worker))) { SysPushErrorIO(""); return {}; diff --git a/Source/IO/Net/AuNetStream.Linux.cpp b/Source/IO/Net/AuNetStream.Linux.cpp index 84099a02..f87674bb 100644 --- a/Source/IO/Net/AuNetStream.Linux.cpp +++ b/Source/IO/Net/AuNetStream.Linux.cpp @@ -77,11 +77,15 @@ namespace Aurora::IO::Net { AuLoop::ILSEvent *optEvent {}; - if (auto pLoopSource = this->pCompletionGroup_->GetTriggerLoopSource()) + if (this->pCompletionGroup_) { - optEvent = AuStaticCast(pLoopSource).get(); + if (auto pLoopSource = this->pCompletionGroup_->GetTriggerLoopSource()) + { + optEvent = AuStaticCast(pLoopSource).get(); + } } - else + + if (!optEvent) { if (auto pWaitable = this->pWaitable) { @@ -89,7 +93,7 @@ namespace Aurora::IO::Net } } - if (!UNIX::LinuxOverlappedSubmitRead(this->GetSocket(), 0, this, optEvent)) + if (!UNIX::LinuxOverlappedSubmitRead(this->GetSocket(), 0, this, optEvent, true)) { LIOS_SendProcess(0, true, errno); return true; @@ -159,11 +163,15 @@ namespace Aurora::IO::Net { AuLoop::ILSEvent *optEvent {}; - if (auto pLoopSource = this->pCompletionGroup_->GetTriggerLoopSource()) + if (this->pCompletionGroup_) { - optEvent = AuStaticCast(pLoopSource).get(); + if (auto pLoopSource = this->pCompletionGroup_->GetTriggerLoopSource()) + { + optEvent = AuStaticCast(pLoopSource).get(); + } } - else + + if (!optEvent) { if (auto pWaitable = this->pWaitable) { @@ -323,16 +331,21 @@ namespace Aurora::IO::Net { AuLoop::ILSEvent *optEvent {}; - if (auto pLoopSource = this->pCompletionGroup_->GetTriggerLoopSource()) + if (this->pCompletionGroup_) { - return pLoopSource; + if (auto pLoopSource = this->pCompletionGroup_->GetTriggerLoopSource()) + { + return pLoopSource; + } } - else + { if (auto pWaitable = this->pWaitable) { return pWaitable; } + + return this->pWaitable = AuLoop::NewLSEventSlow(false, true); } return {}; @@ -359,7 +372,7 @@ namespace Aurora::IO::Net void LinuxAsyncNetworkTransaction::MakeSyncable() { - this->pWaitable = AuLoop::NewLSEvent(false, true); + this->pWaitable = AuLoop::NewLSEventSlow(false, true); SysAssert(this->pWaitable); //this->overlap.hEvent = (HANDLE)AuStaticCast(this->pWaitable)->GetHandle(); } diff --git a/Source/IO/Net/AuNetWorker.cpp b/Source/IO/Net/AuNetWorker.cpp index 763f340e..40bcfffc 100644 --- a/Source/IO/Net/AuNetWorker.cpp +++ b/Source/IO/Net/AuNetWorker.cpp @@ -18,7 +18,7 @@ namespace Aurora::IO::Net workerIndex_(workerIndex), pIOProcessor_(pIOProcessor) { - this->pEvent_ = AuLoop::NewLSEvent(false, true); + this->pEvent_ = AuLoop::NewLSEventSlow(false, true); SysAssert(bool(this->pEvent_)); } diff --git a/Source/IO/UNIX/IOSubmit.Linux.cpp b/Source/IO/UNIX/IOSubmit.Linux.cpp index 243c35f7..ea5588af 100644 --- a/Source/IO/UNIX/IOSubmit.Linux.cpp +++ b/Source/IO/UNIX/IOSubmit.Linux.cpp @@ -20,6 +20,7 @@ namespace Aurora::IO::UNIX { + static bool LinuxOverlappedWriteWait(int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent); static bool LinuxOverlappedSubmit(int fd, int op, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent); static bool LinuxOverlappedCancel(ASubmittable *context); @@ -72,6 +73,8 @@ namespace Aurora::IO::UNIX return; } + // TODO: re-add LinuxOverlappedWriteWait? + LIOS_Reset(); try @@ -480,6 +483,38 @@ namespace Aurora::IO::UNIX return AuTryInsert(io->submitPendingArray, &submit); } + static bool LinuxOverlappedWriteWait(int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent) + { + auto io = GetTls(); + if (!io) + { + return false; + } + + context->bIsWritePending = true; + + context->offset2 = offset; // redundant - always zero + context->fd2 = fd; + context->optEvent2 = optEvent; + + auto &submit = context->GetIOCB(); + submit.aio_data = context->GetData(); + submit.aio_lio_opcode = IOCB_CMD_POLL; + submit.aio_reqprio = 0; + submit.aio_fildes = context->GetOrCreateFdPollForBlockingRead(fd); + submit.aio_offset = 0; + submit.aio_buf = POLLOUT; + submit.aio_nbytes = 0; + + if (submit.aio_fildes == -1) + { + SysPushErrorIO(); + return false; + } + + return AuTryInsert(io->submitPendingArray, &submit); + } + bool LinuxOverlappedSubmitRead(int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent, bool bWaitForRead) { if (bWaitForRead) @@ -544,6 +579,11 @@ namespace Aurora::IO::UNIX for (AU_ITERATE_N(i, temp)) { auto handle = AuReinterpretCast(ioEvents[i].data); + if (!handle) + { + continue; + } + io->dwIoSubmits--; auto errNo = 0; @@ -611,6 +651,7 @@ namespace Aurora::IO::UNIX int LinuxOverlappedEpollShim(int epfd, struct epoll_event *events, int maxevents, int timeout) { + bool bRet { true }; timespec targetTime; auto io = GetTls(); if (!io) @@ -667,8 +708,8 @@ namespace Aurora::IO::UNIX AuUInt64 uTimeNow = AuTime::SteadyClockNS(); if (uTargetTime <= uTimeNow) { - errno = EINTR; - return -1; + bRet = false; + goto exit; } AuTime::ns2ts(&targetTime, uTimeNow - uTargetTime); @@ -680,13 +721,7 @@ namespace Aurora::IO::UNIX { for (AU_ITERATE_N(i, temp)) { - auto handle = AuReinterpretCast(ioEvents[i].data); - if (!handle) - { - // loop source has message - bEpollTriggered = true; - } - else + if (auto handle = AuReinterpretCast(ioEvents[i].data)) { io->dwIoSubmits--; @@ -706,6 +741,10 @@ namespace Aurora::IO::UNIX handle->LIOS_SendProcess(bytesTransacted, bError, errNo); } + else + { + bEpollTriggered = true; + } } } else @@ -742,33 +781,44 @@ namespace Aurora::IO::UNIX } } while ((timeout ? !bEpollTriggered : false) || AuExchange(bAgain, false)); + exit: io_event finalEpollEvent {}; - if ((bEpollTriggered) || - (io_cancel(io->context, ptr, &finalEpollEvent) == 0)) + if (!bEpollTriggered) { - // TODO (Reece): if finalEpollEvent.res == 1, bEpollTriggered = true? - } - else - { - // Do I care? - SysPushErrorIO(); - // I don't - // I do again + if (io_cancel(io->context, ptr, &finalEpollEvent) != 0) + { + if (errno != EINPROGRESS) + { + if (errno != EINVAL) // ? + { + SysPushErrorIO("{}", errno); + } + } + else + { + LinuxOverlappedYield(); + } + } } if (bEpollTriggered) { return epoll_wait(epfd, events, maxevents, 0); } - else + else if (bRet) { return 0; } + else + { + return -ETIMEDOUT; + } } bool LinuxOverlappedWaitForOne(AuUInt32 timeout, AuUInt read, AuUInt write, bool &bReadTriggered, bool &bWriteTriggered) { timespec targetTime; + bool bRet { true }; auto io = GetTls(); bWriteTriggered = false; @@ -842,8 +892,8 @@ namespace Aurora::IO::UNIX AuUInt64 uTimeNow = AuTime::SteadyClockNS(); if (uTargetTime <= uTimeNow) { - errno = EINTR; - return false; + bRet = false; + goto exit; } AuTime::ns2ts(&targetTime, uTimeNow - uTargetTime); @@ -866,7 +916,11 @@ namespace Aurora::IO::UNIX else { auto handle = AuReinterpretCast(ioEvents[i].data); - + if (!handle) + { + continue; + } + io->dwIoSubmits--; auto errNo = 0; @@ -915,6 +969,7 @@ namespace Aurora::IO::UNIX } while (timeout ? (!bReadTriggered && !bWriteTriggered) : false); + exit: for (int i = 0; i < iocbIdx; i++) { io_event finalEpollEvent {}; @@ -932,20 +987,24 @@ namespace Aurora::IO::UNIX continue; } - if (io_cancel(io->context, ptrArray[i], &finalEpollEvent) == 0) + // "Fun". Undocumented return values to indicate we need to double our syscall count bc Linux devs cant design shit. + if (io_cancel(io->context, ptrArray[i], &finalEpollEvent) != 0) { - // TODO (Reece): if finalEpollEvent.res == 1, bEpollTriggered = true? - } - else - { - // Do I care? - SysPushErrorIO(); - // I don't - // I do again + if (errno != EINPROGRESS) + { + if (errno != EINVAL) // ? + { + SysPushErrorIO("{}", errno); + } + } + else + { + LinuxOverlappedYield(); + } } } - return true; + return bRet; } bool LinuxOverlappedWaitForAtleastOne(AuUInt32 timeout, const AuList &handles, const AuList &handlesWrite, AuUInt &one, AuUInt &two) @@ -986,6 +1045,10 @@ namespace Aurora::IO::UNIX for (AU_ITERATE_N(i, temp)) { auto handle = AuReinterpretCast(ioEvents[i].data); + if (!handle) + { + continue; + } io->dwIoSubmits--; diff --git a/Source/IO/UNIX/IOSubmit.Linux.hpp b/Source/IO/UNIX/IOSubmit.Linux.hpp index 1dbd7d68..4cd38dc3 100644 --- a/Source/IO/UNIX/IOSubmit.Linux.hpp +++ b/Source/IO/UNIX/IOSubmit.Linux.hpp @@ -38,6 +38,7 @@ namespace Aurora::IO::UNIX // Hack for blocking reads bool bIsReadPending {}; + bool bIsWritePending {}; AuUInt64 offset2 {}; int fd2 {}; AuLoop::ILSEvent *optEvent2 {}; diff --git a/Source/Threading/Threads/AuOSThread.cpp b/Source/Threading/Threads/AuOSThread.cpp index 2a471b0d..2e289fa6 100644 --- a/Source/Threading/Threads/AuOSThread.cpp +++ b/Source/Threading/Threads/AuOSThread.cpp @@ -601,8 +601,6 @@ namespace Aurora::Threading::Threads void OSThread::_ThreadEP() { - AU_DEBUG_MEMCRUNCH; - // Poke TLS reference thread entity // TODO: we need an internal OSThread *TryPokeTLSThread() auto osThread = static_cast(GetThread()); @@ -631,6 +629,8 @@ namespace Aurora::Threading::Threads try { + AU_DEBUG_MEMCRUNCH; + if (auto task = task_) { this->epExecEvent->Set(); @@ -848,6 +848,8 @@ namespace Aurora::Threading::Threads void OSThread::OSAttach() { + AU_DEBUG_MEMCRUNCH; + HandleRegister(this); #if defined(AURORA_IS_LINUX_DERIVED) this->unixThreadId_ = gettid();