[+] Async/overlapped UNIX pipe IO

[*] Further work on Linux LoopQueue for WaitAll
This commit is contained in:
Reece Wilson 2022-04-17 23:46:05 +01:00
parent 06e4411511
commit f1ef6ac43c
6 changed files with 167 additions and 98 deletions

View File

@ -7,7 +7,6 @@
***/
#include <Source/RuntimeInternal.hpp>
#include "FS.hpp"
#include "Async.Linux.hpp"
#include "FileAdvisory.Unix.hpp"
#include <Source/Loop/Loop.hpp>
#include <Source/Loop/LSHandle.hpp>
@ -16,81 +15,10 @@
#include <unistd.h>
#include <fcntl.h>
#include "FileStream.Unix.hpp"
#include "Async.Linux.hpp"
namespace Aurora::IO::FS
{
struct LinuxAsyncFileTransaction;
struct LinuxAsyncFileTransactionLoopSource : Loop::LSEvent
{
LinuxAsyncFileTransactionLoopSource(AuSPtr<LinuxAsyncFileTransaction> that);
virtual bool IsSignaled() override;
virtual bool OnTrigger(AuUInt handle) override;
virtual Loop::ELoopSource GetType() override;
private:
AuWPtr<LinuxAsyncFileTransaction> caller_;
};
struct FileHandle
{
~FileHandle();
bool Init(const AuString &path, EFileOpenMode openMode, bool directIO, EFileAdvisoryLockLevel lock);
int handle {-1};
AuString path;
bool readOnly;
bool directIO;
};
struct LinuxAsyncFileStream : public IAsyncFileStream
{
AuSPtr<IAsyncTransaction> NewTransaction() override;
bool BlockingTruncate(AuUInt64 length) override;
bool BlockingRead(AuUInt64 offset, const Memory::MemoryViewStreamWrite &parameters) override;
bool BlockingWrite(AuUInt64 offset, const Memory::MemoryViewStreamRead &parameters) override;
void Init(const AuSPtr<FileHandle> &handle);
AuSPtr<FileHandle> GetHandle();
private:
AuSPtr<FileHandle> handle_;
};
struct LinuxAsyncFileTransaction : IAsyncTransaction, AuEnableSharedFromThis<LinuxAsyncFileTransaction>, Aurora::IO::UNIX::ASubmittable
{
~LinuxAsyncFileTransaction();
bool Init(const AuSPtr<FileHandle> &handle);
bool StartRead(AuUInt64 offset, const AuSPtr<AuMemoryViewWrite> &memoryView) override;
bool StartWrite(AuUInt64 offset, const AuSPtr<AuMemoryViewRead> &memoryView) override;
bool Complete() override;
AuUInt32 GetLastPacketLength() override;
void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &sub) override;
bool Wait(AuUInt32 timeout) override;
AuSPtr<Loop::ILoopSource> NewLoopSource() override;
void DispatchCb();
AuSPtr<FileHandle> GetFileHandle();
virtual void LIOS_Process(AuUInt32 read, bool failure, int err, bool mark) override;
private:
AuSPtr<FileHandle> handle_;
AuUInt32 lastAbstractOffset_ {}, lastFinishedStat_{};
bool latch_ {};
bool bTxFinished_ {};
AuSPtr<IAsyncFinishedSubscriber> sub_;
AuSPtr<LinuxAsyncFileTransactionLoopSource> loopSource_;
};
LinuxAsyncFileTransactionLoopSource::LinuxAsyncFileTransactionLoopSource(AuSPtr<LinuxAsyncFileTransaction> that) : caller_(that), Loop::LSEvent(false, false, true)
{
@ -119,11 +47,20 @@ namespace Aurora::IO::FS
FileHandle::~FileHandle()
{
if ((this->handle != 0) &&
(this->handle != -1))
if ((this->readHandle != 0) &&
(this->readHandle != -1))
{
close(this->handle);
::close(this->readHandle);
}
if ((this->writeHandle != 0) &&
(this->writeHandle != -1) &&
(this->writeHandle != this->readHandle))
{
::close(this->writeHandle);
}
this->readHandle = this->writeHandle = -1;
}
bool FileHandle::Init(const AuString &path, EFileOpenMode openMode, bool directIO, EFileAdvisoryLockLevel lock)
@ -153,10 +90,19 @@ namespace Aurora::IO::FS
}
this->directIO = directIO;
this->handle = fileHandle;
this->readHandle = this->writeHandle = fileHandle;
this->readOnly = openMode == EFileOpenMode::eRead;
return true;
}
void FileHandle::Init(int read, int write)
{
this->readHandle = read;
this->writeHandle = write;
this->directIO = true;
this->readOnly = false;
}
AuSPtr<FileHandle> LinuxAsyncFileStream::GetHandle()
{
@ -186,18 +132,18 @@ namespace Aurora::IO::FS
bool LinuxAsyncFileStream::BlockingTruncate(AuUInt64 length)
{
return ::ftruncate(this->handle_->handle, length) != -1;
return ::ftruncate(this->handle_->writeHandle, length) != -1;
}
bool LinuxAsyncFileStream::BlockingRead(AuUInt64 offset, const Memory::MemoryViewStreamWrite &parameters)
{
if (!PosixSetOffset(this->handle_->handle, offset))
if (!PosixSetOffset(this->handle_->readHandle, offset))
{
return false;
}
AuUInt32 read;
if (!PosixRead(this->handle_->handle, parameters.ptr, parameters.length, &read))
if (!PosixRead(this->handle_->readHandle, parameters.ptr, parameters.length, &read))
{
return false;
}
@ -207,13 +153,13 @@ namespace Aurora::IO::FS
bool LinuxAsyncFileStream::BlockingWrite(AuUInt64 offset, const Memory::MemoryViewStreamRead &parameters)
{
if (!PosixSetOffset(this->handle_->handle, offset))
if (!PosixSetOffset(this->handle_->writeHandle, offset))
{
return false;
}
AuUInt32 read;
if (!PosixWrite(this->handle_->handle, parameters.ptr, parameters.length, &read))
if (!PosixWrite(this->handle_->writeHandle, parameters.ptr, parameters.length, &read))
{
return false;
}
@ -236,7 +182,7 @@ namespace Aurora::IO::FS
return false;
}
auto fd = this->handle_->handle;
auto fd = this->handle_->readHandle;
if (fd == -1)
{
SysPushErrorUninitialized();
@ -275,7 +221,7 @@ namespace Aurora::IO::FS
return false;
}
auto fd = this->handle_->handle;
auto fd = this->handle_->writeHandle;
if (fd == -1)
{
SysPushErrorUninitialized();

View File

@ -0,0 +1,86 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Async.Linux.hpp
Date: 2022-4-12
Author: Reece
***/
#pragma once
namespace Aurora::IO::FS
{
struct LinuxAsyncFileTransaction;
struct LinuxAsyncFileTransactionLoopSource : Loop::LSEvent
{
LinuxAsyncFileTransactionLoopSource(AuSPtr<LinuxAsyncFileTransaction> that);
virtual bool IsSignaled() override;
virtual bool OnTrigger(AuUInt handle) override;
virtual Loop::ELoopSource GetType() override;
private:
AuWPtr<LinuxAsyncFileTransaction> caller_;
};
struct FileHandle
{
~FileHandle();
bool Init(const AuString &path, EFileOpenMode openMode, bool directIO, EFileAdvisoryLockLevel lock);
void Init(int read, int write);
int readHandle {-1};
int writeHandle {-1};
AuString path;
bool readOnly;
bool directIO;
};
struct LinuxAsyncFileStream : public IAsyncFileStream
{
AuSPtr<IAsyncTransaction> NewTransaction() override;
bool BlockingTruncate(AuUInt64 length) override;
bool BlockingRead(AuUInt64 offset, const Memory::MemoryViewStreamWrite &parameters) override;
bool BlockingWrite(AuUInt64 offset, const Memory::MemoryViewStreamRead &parameters) override;
void Init(const AuSPtr<FileHandle> &handle);
AuSPtr<FileHandle> GetHandle();
private:
AuSPtr<FileHandle> handle_;
};
struct LinuxAsyncFileTransaction : IAsyncTransaction, AuEnableSharedFromThis<LinuxAsyncFileTransaction>, Aurora::IO::UNIX::ASubmittable
{
~LinuxAsyncFileTransaction();
bool Init(const AuSPtr<FileHandle> &handle);
bool StartRead(AuUInt64 offset, const AuSPtr<AuMemoryViewWrite> &memoryView) override;
bool StartWrite(AuUInt64 offset, const AuSPtr<AuMemoryViewRead> &memoryView) override;
bool Complete() override;
AuUInt32 GetLastPacketLength() override;
void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &sub) override;
bool Wait(AuUInt32 timeout) override;
AuSPtr<Loop::ILoopSource> NewLoopSource() override;
void DispatchCb();
AuSPtr<FileHandle> GetFileHandle();
virtual void LIOS_Process(AuUInt32 read, bool failure, int err, bool mark) override;
private:
AuSPtr<FileHandle> handle_;
AuUInt32 lastAbstractOffset_ {}, lastFinishedStat_{};
bool latch_ {};
bool bTxFinished_ {};
AuSPtr<IAsyncFinishedSubscriber> sub_;
AuSPtr<LinuxAsyncFileTransactionLoopSource> loopSource_;
};
}

View File

@ -10,6 +10,7 @@
#include "IPCHandle.hpp"
#include "IPCPipe.NT.hpp"
#include <Source/Loop/ILoopSourceEx.hpp>
#include <Source/Loop/LSHandle.hpp>
#include <Source/Loop/LSEvent.hpp>
#include <Source/IO/FS/Async.NT.hpp>
@ -64,8 +65,8 @@ namespace Aurora::IPC
HANDLE clientHandle_ {INVALID_HANDLE_VALUE};
IPCHandle ipcHandle_;
AuSPtr<IO::FS::FileHandle> fsHandle_;
AuSPtr<Loop::ILSEvent> hasClient_;
AuSPtr<IO::FS::NtAsyncFileStream> fsStream_;
AuSPtr<Loop::ILSEvent> hasClient_;
AuSPtr<IPCHasConnectionEvent> lshasConnection_;
bool bFirstTime {true};
};

View File

@ -14,6 +14,11 @@
#include <Source/Loop/ILoopSourceEx.hpp>
#include <Source/Loop/LSHandle.hpp>
#include <Source/Loop/LSEvent.hpp>
#include <Source/IO/UNIX/IOSubmit.Linux.hpp>
#include <Source/IO/FS/Async.Linux.hpp>
#include <fcntl.h>
namespace Aurora::IPC
@ -46,6 +51,9 @@ namespace Aurora::IPC
int fds[2] {-1, -1};
int secondary[2] {-1, -1};
AuSPtr<IO::FS::FileHandle> fsHandle_;
AuSPtr<IO::FS::LinuxAsyncFileStream> fsStream_;
IPCHandle readEnd_;
IPCHandle writeEnd_;
AuSPtr<IPCEvent> event_;
@ -57,6 +65,12 @@ namespace Aurora::IPC
readEnd_(readEnd), writeEnd_(writeEnd), event_(event), mutex_(mutex)
{
this->handle = fds[0];
this->fsHandle_ = AuMakeShared<IO::FS::FileHandle>();
this->fsStream_ = AuMakeShared<IO::FS::LinuxAsyncFileStream>();
this->fsHandle_->Init(fds2[0], fds2[1]);
this->fsStream_->Init(this->fsHandle_);
}
IPCPipeImpl::~IPCPipeImpl()
@ -66,13 +80,13 @@ namespace Aurora::IPC
if ((fd = AuExchange(fds[0], -1)) != -1)
{
IO::UNIX::FDServeEnd(readEnd_);
::close(fd);
//::close(fd);
}
if ((fd = AuExchange(fds[1], -1)) != -1)
{
IO::UNIX::FDServeEnd(writeEnd_);
::close(fd);
//::close(fd);
}
if ((fd = AuExchange(secondary[0], -1)) != -1)
@ -85,6 +99,9 @@ namespace Aurora::IPC
::close(fd);
}
fsHandle_.reset();
fsStream_.reset();
if (secondary[0] == -1)
{
event_->Reset();
@ -103,7 +120,7 @@ namespace Aurora::IPC
AuSPtr<IO::FS::IAsyncTransaction> IPCPipeImpl::NewAsyncTransaction()
{
return {};
return this->fsStream_->NewTransaction();
}
bool IPCPipeImpl::Read(const Memory::MemoryViewStreamWrite &write, bool nonblock)

View File

@ -420,7 +420,7 @@ namespace Aurora::Loop
// but this hack should apply to wait any as well, so i'm moving it to the DoTick function
anythingLeft = epollReference.startingWorkRead.size() || epollReference.startingWorkWrite.size();
bTimeout = AuTime::CurrentClockMS() >= timeout;
bTimeout = timeout ? AuTime::CurrentClockMS() >= timeout : false;
} while (anythingLeft && !bTimeout);
{
@ -678,7 +678,7 @@ namespace Aurora::Loop
auto source = AuReinterpretCast<SourceExtended *>(handle)->pin.lock();
auto [ticked, remove] = source->DoWork(readData, writeData);
auto [ticked, remove, noworkers] = source->DoWork(readData, writeData);
bTicked += ticked;
if (ticked)
@ -704,6 +704,21 @@ namespace Aurora::Loop
epoll->Remove(source.get(), readData, writeData);
}
}
// Fire waitall
// not sure i like how this fires all anys and alls.
// this isnt consistent
if (noworkers)
{
AU_LOCK_GUARD(this->polledItemsMutex_->AsReadable());
for (auto epoll : this->alternativeEpolls_)
{
if (epoll != &this->globalEpoll_)
{
epoll->Remove(source.get(), readData, writeData);
}
}
}
}
now = AuTime::CurrentClockMS();
@ -788,7 +803,7 @@ namespace Aurora::Loop
this->bHasCommited = true;
}
AuPair<bool, bool> LoopQueue::SourceExtended::DoWork(bool read, bool write)
AuTuple<bool, bool, bool> LoopQueue::SourceExtended::DoWork(bool read, bool write)
{
if (!this->sourceExtended)
{
@ -799,21 +814,25 @@ namespace Aurora::Loop
{
AuPair<bool, bool> ret;
bool bSingleOnlyFlag {true};
if (read)
{
auto [a, b] = DoWork(this->sourceExtended->GetHandle());
auto [a, b, c] = DoWork(this->sourceExtended->GetHandle());
ret.first |= a;
ret.second |= b;
bSingleOnlyFlag &= c;
}
if (write)
{
auto [a, b] = DoWork(this->sourceExtended->GetWriteHandle());
auto [a, b, c] = DoWork(this->sourceExtended->GetWriteHandle());
ret.first |= a;
ret.second |= b;
bSingleOnlyFlag &= c;
}
return ret;
return AuMakeTuple(AuGet<0>(ret), AuGet<1>(ret), bSingleOnlyFlag);
}
else
{
@ -822,7 +841,7 @@ namespace Aurora::Loop
}
}
AuPair<bool, bool> LoopQueue::SourceExtended::DoWork(int fd)
AuTuple<bool, bool, bool> LoopQueue::SourceExtended::DoWork(int fd)
{
bool bShouldRemove {true};
AuUInt8 uPosition {};
@ -926,7 +945,7 @@ namespace Aurora::Loop
}
}
}
return AuMakePair(true, bShouldRemove);
return AuMakeTuple(true, bShouldRemove, bOverload);
}
AUKN_SYM AuSPtr<ILoopQueue> NewLoopQueue()

View File

@ -91,8 +91,8 @@ namespace Aurora::Loop
bool bHasCommited {};
// ticked, should remove
AuPair<bool, bool> DoWork(int fd);
AuPair<bool, bool> DoWork(bool read, bool write);
AuTuple<bool, bool, bool> DoWork(int fd);
AuTuple<bool, bool, bool> DoWork(bool read, bool write);
};
struct AnEpoll