[+] Linux Async IO

This commit is contained in:
Reece Wilson 2022-04-13 12:00:35 +01:00
parent aa7404fc25
commit 415116c891
13 changed files with 1342 additions and 15 deletions

0
Include/Aurora/Async/OldTrash.hpp Executable file → Normal file
View File

View File

@ -21,6 +21,19 @@ namespace Aurora::IO::FS
* @param transactions An array mask of FIO transactions
* @param timeout Timeout in milliseconds, zero = indefinite
*/
AUKN_SYM AuUInt32 WaitMultiple(const AuList<AuSPtr<IAsyncTransaction>> &transactions, AuUInt32 timeout);
/**
* A hint to dispatch any outstanding IO requests.
*
* The Aurora Runtime optimizes file FileRead/Writes by batching on some platforms.
* It may be required to call SendBatched for lower latency immediately after a Read/Write.
*
* Consider any call to WaitMultiple, internal loop source signal state, or loop queue wait,
* an internal hint to request an async IO work-queue flush.
*
* @warning This does nothing on NT
* @warning Linux StartRead/StartWrites will not start immediately. They instead wait for a yield or call to SendBatched.
*/
AUKN_SYM void SendBatched();
}

View File

@ -255,6 +255,7 @@ namespace Aurora
TelemetryConfig telemetry;
AsyncConfig async;
FIOConfig fio;
bool bFIODisableBatching {true};
};
/**

View File

@ -25,6 +25,9 @@
#if defined(AURORA_PLATFORM_WIN32)
#include "Extensions/Win32/DarkTheme.hpp"
#endif
#if defined(AURORA_IS_POSIX_DERIVED)
#include "IO/UNIX/UnixIO.hpp"
#endif
//#include "IO/Net/Net.hpp"
#include "Process/ProcessMap.hpp"
#include "Exit/Exit.hpp"
@ -45,6 +48,9 @@ static void Init()
Aurora::Exit::InitExit();
Aurora::Console::Init();
Aurora::IO::FS::InitResources();
#if defined(AURORA_IS_POSIX_DERIVED)
Aurora::IO::UNIX::InitUnixIO();
#endif
Aurora::Console::Init2();
Aurora::HWInfo::Init();
Aurora::Telemetry::Init();

View File

@ -0,0 +1,429 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Async.Linux.cpp
Date: 2022-4-12
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "FS.hpp"
#include "Async.Linux.hpp"
#include "FileAdvisory.Unix.hpp"
#include <Source/Loop/Loop.hpp>
#include <Source/Loop/LSHandle.hpp>
#include <Source/Loop/LSEvent.hpp>
#include <Source/IO/UNIX/IOSubmit.Linux.hpp>
#include <unistd.h>
#include <fcntl.h>
namespace Aurora::IO::FS
{
struct LinuxAsyncFileTransaction;
struct LinuxAsyncFileTransactionLoopSource : Loop::LSEvent
{
LinuxAsyncFileTransactionLoopSource(AuSPtr<LinuxAsyncFileTransaction> that);
virtual bool IsSignaled() override;
virtual bool OnTrigger(AuUInt handle) override;
virtual Loop::ELoopSource GetType() override;
private:
AuWPtr<LinuxAsyncFileTransaction> caller_;
};
struct FileHandle
{
~FileHandle();
bool Init(const AuString &path, EFileOpenMode openMode, bool directIO, EFileAdvisoryLockLevel lock);
int handle {-1};
AuString path;
bool readOnly;
bool directIO;
};
struct LinuxAsyncFileStream : public IAsyncFileStream
{
AuSPtr<IAsyncTransaction> NewTransaction() override;
void Init(const AuSPtr<FileHandle> &handle);
AuSPtr<FileHandle> GetHandle();
private:
AuSPtr<FileHandle> handle_;
};
struct LinuxAsyncFileTransaction : IAsyncTransaction, AuEnableSharedFromThis<LinuxAsyncFileTransaction>, Aurora::IO::UNIX::ASubmittable
{
~LinuxAsyncFileTransaction();
bool Init(const AuSPtr<FileHandle> &handle);
bool StartRead(AuUInt64 offset, const AuSPtr<AuMemoryViewWrite> &memoryView) override;
bool StartWrite(AuUInt64 offset, const AuSPtr<AuMemoryViewRead> &memoryView) override;
bool Complete() override;
AuUInt32 GetLastPacketLength() override;
void SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &sub) override;
bool Wait(AuUInt32 timeout) override;
AuSPtr<Loop::ILoopSource> NewLoopSource() override;
void DispatchCb();
AuSPtr<FileHandle> GetFileHandle();
virtual void LIOS_Process(AuUInt32 read, bool failure, int err, bool mark) override;
private:
AuSPtr<FileHandle> handle_;
AuUInt32 lastAbstractOffset_ {}, lastFinishedStat_{};
bool latch_ {};
bool bTxFinished_ {};
AuSPtr<IAsyncFinishedSubscriber> sub_;
AuSPtr<LinuxAsyncFileTransactionLoopSource> loopSource_;
};
LinuxAsyncFileTransactionLoopSource::LinuxAsyncFileTransactionLoopSource(AuSPtr<LinuxAsyncFileTransaction> that) : caller_(that), Loop::LSEvent(false, false, true)
{
}
bool LinuxAsyncFileTransactionLoopSource::OnTrigger(AuUInt handle)
{
return IsSignaled();
}
bool LinuxAsyncFileTransactionLoopSource::IsSignaled()
{
auto lock = caller_.lock();
if (!lock) return LSEvent::IsSignaled();
return lock->Complete();
}
Loop::ELoopSource LinuxAsyncFileTransactionLoopSource::GetType()
{
return Loop::ELoopSource::eSourceAIO;
}
LinuxAsyncFileTransaction::~LinuxAsyncFileTransaction()
{
}
FileHandle::~FileHandle()
{
if ((this->handle != 0) &&
(this->handle != -1))
{
close(this->handle);
}
}
bool FileHandle::Init(const AuString &path, EFileOpenMode openMode, bool directIO, EFileAdvisoryLockLevel lock)
{
int fileHandle;
auto pathex = NormalizePathRet(path);
if (pathex.empty())
{
return false;
}
fileHandle = ::open(pathex.c_str(),
openMode == EFileOpenMode::eRead ? O_RDONLY : (O_RDWR | O_CREAT),
0664);
if (fileHandle == -1)
{
SysPushErrorIO("Couldn't open file: {} ({}) {}", path, pathex, errno);
return false;
}
this->directIO = directIO;
this->handle = fileHandle;
this->readOnly = openMode == EFileOpenMode::eRead;
return true;
}
AuSPtr<FileHandle> LinuxAsyncFileStream::GetHandle()
{
return handle_;
}
void LinuxAsyncFileStream::Init(const AuSPtr<FileHandle> &handle)
{
this->handle_ = handle;
}
AuSPtr<IAsyncTransaction> LinuxAsyncFileStream::NewTransaction()
{
auto shared = AuMakeShared<LinuxAsyncFileTransaction>();
if (!shared)
{
return {};
}
if (!shared->Init(this->handle_))
{
return {};
}
return shared;
}
bool LinuxAsyncFileTransaction::Init(const AuSPtr<FileHandle> &handle)
{
this->handle_ = handle;
this->loopSource_ = AuMakeShared<LinuxAsyncFileTransactionLoopSource>(AuSharedFromThis());
return bool(this->loopSource_);
}
bool LinuxAsyncFileTransaction::StartRead(AuUInt64 offset, const AuSPtr<AuMemoryViewWrite> &memoryView)
{
if (HasState())
{
SysPushErrorIO("IO Operation can not be reused yet.");
return false;
}
auto fd = this->handle_->handle;
if (fd == -1)
{
SysPushErrorUninitialized();
return false;
}
this->latch_ = false;
this->lastAbstractOffset_ = offset;
this->lastFinishedStat_ = 0;
LIOS_Init(AuSharedFromThis());
SetMemory(memoryView);
if (!UNIX::LinuxOverlappedSubmitRead(fd, offset, this, this->loopSource_.get()))
{
LIOS_SendProcess(0, true, errno);
return false;
}
else
{
if (gRuntimeConfig.bFIODisableBatching)
{
UNIX::SendIOBuffers();
}
return true;
}
}
bool LinuxAsyncFileTransaction::StartWrite(AuUInt64 offset, const AuSPtr<AuMemoryViewRead> &memoryView)
{
if (HasState())
{
SysPushErrorIO("IO Operation can not be reused yet.");
return false;
}
auto fd = this->handle_->handle;
if (fd == -1)
{
SysPushErrorUninitialized();
return false;
}
this->latch_ = false;
this->lastAbstractOffset_ = offset;
this->lastFinishedStat_ = 0;
LIOS_Init(AuSharedFromThis());
SetMemory(memoryView);
if (!UNIX::LinuxOverlappedSubmitWrite(fd, offset, this, this->loopSource_.get()))
{
LIOS_SendProcess(0, true, errno);
return false;
}
else
{
if (gRuntimeConfig.bFIODisableBatching)
{
UNIX::SendIOBuffers();
}
return true;
}
}
void LinuxAsyncFileTransaction::LIOS_Process(AuUInt32 read, bool failure, int err, bool mark)
{
this->lastFinishedStat_ = read;
this->bTxFinished_ = true;
if (mark)
{
return;
}
this->DispatchCb();
}
void LinuxAsyncFileTransaction::DispatchCb()
{
if (AuExchange(this->latch_, true))
{
SysPushErrorGeneric();
return;
}
if (this->sub_)
{
this->sub_->OnAsyncFileOpFinished(this->lastAbstractOffset_, this->lastFinishedStat_);
}
}
bool LinuxAsyncFileTransaction::Complete()
{
if (this->bTxFinished_)
{
if (!this->latch_)
{
DispatchCb();
}
return true;
}
return false;
}
AuUInt32 LinuxAsyncFileTransaction::GetLastPacketLength()
{
return this->lastFinishedStat_;
}
void LinuxAsyncFileTransaction::SetCallback(const AuSPtr<IAsyncFinishedSubscriber> &sub)
{
this->sub_ = sub;
}
bool LinuxAsyncFileTransaction::Wait(AuUInt32 timeout)
{
// TODO:
AuList<AuSPtr<IAsyncTransaction>> files {AuUnsafeRaiiToShared(this)};
return WaitMultiple(files, timeout);
}
AuSPtr<FileHandle> LinuxAsyncFileTransaction::GetFileHandle()
{
return this->handle_;
}
AuSPtr<Loop::ILoopSource> LinuxAsyncFileTransaction::NewLoopSource()
{
return AuStaticCast<Loop::ILoopSource>(AuStaticCast<Loop::ILSEvent>(this->loopSource_));
}
AUKN_SYM IAsyncFileStream *OpenAsyncNew(const AuString &path, EFileOpenMode openMode, bool directIO, EFileAdvisoryLockLevel lock)
{
AuSPtr<FileHandle> fileHandle;
LinuxAsyncFileStream *stream;
if (path.empty())
{
SysPushErrorParam("Empty path");
return {};
}
if (!EFileOpenModeIsValid(openMode))
{
SysPushErrorParam("Invalid open mode");
return {};
}
fileHandle = AuMakeShared<FileHandle>();
if (!fileHandle->Init(path, openMode, directIO, lock))
{
SysPushErrorNested();
return {};
}
stream = _new LinuxAsyncFileStream();
if (!stream)
{
SysPushErrorMem();
return {};
}
stream->Init(fileHandle);
return stream;
}
AUKN_SYM void OpenAsyncRelease(IAsyncFileStream *handle)
{
AuSafeDelete<LinuxAsyncFileStream *>(handle);
}
AUKN_SYM AuUInt32 WaitMultiple(const AuList<AuSPtr<IAsyncTransaction>> &files, AuUInt32 timeout)
{
AuCtorCode_t code;
AuUInt32 count {};
AuUInt64 endTime = AuTime::CurrentInternalClockMS() + AuUInt64(timeout);
auto waitQueue = AuTryConstruct<AuList<AuSPtr<IAsyncTransaction>>>(code, files);
if (!code)
{
return 0;
}
while (true)
{
if (waitQueue.empty())
{
break;
}
AuUInt32 timeoutMS {};
if (timeout)
{
AuInt64 delta = (AuInt64)endTime - AuTime::CurrentInternalClockMS();
if (delta <= 0)
{
break;
}
timeoutMS = delta;
}
if (!UNIX::LinuxOverlappedPoll(timeoutMS))
{
continue;
}
for (auto itr = waitQueue.begin();
itr != waitQueue.end();)
{
if (AuStaticPointerCast<LinuxAsyncFileTransaction>(*itr)->Complete())
{
count ++;
itr = waitQueue.erase(itr);
}
else
{
itr++;
}
}
if (count)
{
break;
}
}
return count;
}
}

View File

@ -26,9 +26,8 @@ namespace Aurora::IO::FS
bool directIO;
};
class NtAsyncFileStream : public IAsyncFileStream
struct NtAsyncFileStream : IAsyncFileStream
{
public:
AuSPtr<IAsyncTransaction> NewTransaction() override;
void Init(const AuSPtr<FileHandle> &handle);
@ -39,9 +38,8 @@ namespace Aurora::IO::FS
AuSPtr<FileHandle> handle_;
};
class NtAsyncFileTransaction : public IAsyncTransaction, AuEnableSharedFromThis<NtAsyncFileTransaction>
struct NtAsyncFileTransaction : IAsyncTransaction, AuEnableSharedFromThis<NtAsyncFileTransaction>
{
public:
~NtAsyncFileTransaction();
bool Init(const AuSPtr<FileHandle> &handle);
@ -73,7 +71,7 @@ namespace Aurora::IO::FS
AuSPtr<IAsyncFinishedSubscriber> sub_;
};
struct NtAsyncFileTransactionLoopSource : public Loop::LSHandle
struct NtAsyncFileTransactionLoopSource : Loop::LSHandle
{
NtAsyncFileTransactionLoopSource(AuSPtr<NtAsyncFileTransaction> that) : caller_(that), Loop::LSHandle(AuUInt(that->GetFileHandle()->handle))
{}
@ -83,9 +81,7 @@ namespace Aurora::IO::FS
virtual Loop::ELoopSource GetType() override;
private:
AuWPtr<NtAsyncFileTransaction> caller_;
};
bool NtAsyncFileTransactionLoopSource::OnTrigger(AuUInt handle)
@ -120,13 +116,13 @@ namespace Aurora::IO::FS
HANDLE fileHandle;
auto pathex = NormalizePathRet(path);
if (!pathex.empty())
if (pathex.empty())
{
return false;
}
auto win32Path = Locale::ConvertFromUTF8(pathex);
if (!win32Path.empty())
if (win32Path.empty())
{
return false;
}
@ -164,7 +160,7 @@ namespace Aurora::IO::FS
this->directIO = directIO;
this->handle = fileHandle;
this->readOnly = readOnly;
this->readOnly = openMode == EFileOpenMode::eRead;
return true;
}

View File

@ -302,9 +302,9 @@ namespace Aurora::IO::FS
CreateDirectories(pathex, true);
}
auto fileHandle = open(pathex.c_str(),
openMode == EFileOpenMode::eRead ? O_RDONLY : (O_RDWR | O_CREAT),
0644);
auto fileHandle = ::open(pathex.c_str(),
openMode == EFileOpenMode::eRead ? O_RDONLY : (O_RDWR | O_CREAT),
0664);
if (fileHandle < 0)
{

View File

@ -0,0 +1,755 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOSubmit.Linux.cpp
Date: 2022-4-12
Author: Reece
Note: I want APCs, overlapped, and event based IO on Linux. Gib.
***/
#include <Source/RuntimeInternal.hpp>
#include "UnixIO.hpp"
#include <linux/aio_abi.h>
#include <sys/syscall.h>
#include <sys/epoll.h>
#include <poll.h>
#include <arpa/inet.h>
#include "IOSubmit.Linux.hpp"
#include <Source/Time/Time.hpp>
#include <Source/Loop/Loop.hpp>
#include <Source/Loop/LSEvent.hpp>
static int io_setup(unsigned nr, aio_context_t *ctxp)
{
return syscall(__NR_io_setup, nr, ctxp);
}
static int io_destroy(aio_context_t ctx)
{
return syscall(__NR_io_destroy, ctx);
}
static int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp)
{
return syscall(__NR_io_submit, ctx, nr, iocbpp);
}
#if 0
static int io_getevents(aio_context_t ctx, long min_nr, long max_nr,
struct io_event *events,
struct timespec *timeout)
{
return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout);
}
#endif
#define read_barrier() __asm__ __volatile__("lfence" ::: "memory")
#define AIO_RING_MAGIC 0xa10a10a1
struct aio_ring {
unsigned id; /** kernel internal index number */
unsigned nr; /** number of io_events */
unsigned head;
unsigned tail;
unsigned magic;
unsigned compat_features;
unsigned incompat_features;
unsigned header_length; /** size of aio_ring */
struct io_event events[0];
};
/* Code based on axboe/fio:
* https://github.com/axboe/fio/blob/702906e9e3e03e9836421d5e5b5eaae3cd99d398/engines/libaio.c#L149-L172
*/
static int io_getevents(aio_context_t ctx, long min_nr, long max_nr,
struct io_event *events,
struct timespec *timeout)
{
int i = 0;
struct aio_ring *ring = (struct aio_ring *)ctx;
if (ring == NULL || ring->magic != AIO_RING_MAGIC) {
goto do_syscall;
}
while (i < max_nr) {
unsigned head = ring->head;
if (head == ring->tail) {
/* There are no more completions */
break;
} else {
/* There is another completion to reap */
events[i] = ring->events[head];
read_barrier();
ring->head = (head + 1) % ring->nr;
i++;
}
}
if (i == 0 && timeout != NULL && timeout->tv_sec == 0 &&
timeout->tv_nsec == 0) {
/* Requested non blocking operation. */
return 0;
}
if (i && i >= min_nr) {
return i;
}
do_syscall:
return syscall(__NR_io_getevents, ctx, min_nr - i, max_nr - i,
&events[i], timeout) + i;
}
static int io_cancel(aio_context_t ctx_id, struct iocb *iocb,
struct io_event *result)
{
return syscall(SYS_io_cancel, ctx_id, iocb, result);
}
namespace Aurora::IO::UNIX
{
//////////////////////////////////////////////////////////////////
// ASubmittable
//////////////////////////////////////////////////////////////////
void ASubmittable::LIOS_SendProcess(AuUInt32 read, bool failure, int err, bool mark)
{
// Allow for reuse by releasing before dispatch
// ...holding "this" ownership with a copy of the ref counter on the stack
AuSPtr<void> pinSelf, pinMem;
pinSelf = this->pin_;
pinMem = this->memPin_;
LIOS_Reset();
LIOS_Process(read, failure, err, mark);
}
void ASubmittable::LIOS_Init(const AuSPtr<void> &pin)
{
this->pin_ = pin;
}
void ASubmittable::LIOS_Reset()
{
this->pin_.reset();
this->memPin_.reset();
}
void ASubmittable::SetMemory(const AuSPtr<AuMemoryViewRead> &view)
{
this->dataPtr_ = AuReinterpretCast<AuUInt64>(view->ptr);
this->dataLen_ = view->length;
this->memPin_ = view;
}
void ASubmittable::SetMemory(const AuSPtr<AuMemoryViewWrite> &view)
{
this->dataPtr_ = AuReinterpretCast<AuUInt64>(view->ptr);
this->dataLen_ = view->length;
this->memPin_ = view;
}
bool ASubmittable::HasState()
{
return bool(this->pin_);
}
AuUInt64 ASubmittable::GetBuf()
{
return this->dataPtr_;
}
AuUInt ASubmittable::GetBufLength()
{
return this->dataLen_;
}
AuUInt64 ASubmittable::GetData()
{
return AuReinterpretCast<AuUInt64>(this);
}
iocb & ASubmittable::GetIOCB()
{
return this->cb;
}
//////////////////////////////////////////////////////////////////
// TLS
//////////////////////////////////////////////////////////////////
struct TLSIO
{
bool bInitialized {};
aio_context_t context {};
AuList<iocb*> submitPendingArray;
AuUInt32 dwIoSubmits;
~TLSIO()
{
Deinit();
}
bool Init();
void Deinit();
};
static thread_local TLSIO tlsLocal;
static TLSIO *GetTls()
{
auto handle = &tlsLocal;
if (!handle->bInitialized)
{
handle->bInitialized = handle->Init();
}
return handle->bInitialized ? handle : nullptr;
}
//////////////////////////////////////////////////////////////////
// TLS IO IMPL
//////////////////////////////////////////////////////////////////
bool TLSIO::Init()
{
if (io_setup(1024, &this->context) != 0)
{
return false;
}
return true;
}
void TLSIO::Deinit()
{
io_destroy(this->context);
}
//////////////////////////////////////////////////////////////////
// GLUE
//////////////////////////////////////////////////////////////////
static bool LinuxOverlappedTrySubmitWorkInternal(TLSIO *io)
{
int index = {};
int startingLength = io->submitPendingArray.size();
while (index != startingLength)
{
int ret = io_submit(io->context, io->submitPendingArray.size(), io->submitPendingArray.data() + index);
if (ret <= 0)
{
switch (ret)
{
case (-EAGAIN):
{
SysPushErrorIO("Insufficient resources are available to queue any iocbs");
goto err;
}
case (-ENOSYS):
{
SysPushErrorIO("io_submit() is not implemented on this architecture");
goto err;
}
case (-EINVAL):
{
SysPushErrorIO("io submit context is dead");
goto err;
}
case (-EPERM):
{
SysPanic("IOPRIO_CLASS_RT was requested but not granted to non-CAP_SYS_ADMIN (root?) process");
}
case (-EFAULT):
{
SysPanic("One of the data structures points to invalid data.");
}
case (-EBADF):
{
auto submittable = AuReinterpretCast<ASubmittable*>(io->submitPendingArray[index]->aio_data);
if (submittable)
{
submittable->LIOS_SendProcess(0, true, EBADF);
index ++;
}
}
default:
SysPanic("unhandled case");
}
}
else
{
index += ret;
io->dwIoSubmits += ret;
}
}
AuTryClear(io->submitPendingArray);
return true;
err:
if (index)
{
io->submitPendingArray.erase(io->submitPendingArray.begin(), io->submitPendingArray.begin() + index);
return true;
}
else
{
return false;
}
}
static bool LinuxOverlappedTrySubmitWork()
{
auto io = GetTls();
if (!io)
{
return false;
}
return LinuxOverlappedTrySubmitWorkInternal(io);
}
//////////////////////////////////////////////////////////////////
// OVERLAPPED API
//////////////////////////////////////////////////////////////////
static bool LinuxOverlappedSubmit(int fd, int op, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent)
{
auto io = GetTls();
if (!io)
{
return false;
}
iocb &submit = context->GetIOCB();
submit.aio_data = context->GetData();
submit.aio_lio_opcode = op;
submit.aio_reqprio = 0;
submit.aio_fildes = fd;
submit.aio_offset = offset;
submit.aio_buf = context->GetBuf();
submit.aio_nbytes = context->GetBufLength();
if (optEvent)
{
submit.aio_resfd = AuStaticCast<AuLoop::LSEvent>(optEvent)->GetHandle();
submit.aio_flags |= IOCB_FLAG_RESFD;
}
return AuTryInsert(io->submitPendingArray, &submit);
}
bool LinuxOverlappedSubmitRead(int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent)
{
return LinuxOverlappedSubmit(fd, IOCB_CMD_PREAD, offset, context, optEvent);
}
bool LinuxOverlappedSubmitWrite(int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent)
{
return LinuxOverlappedSubmit(fd, IOCB_CMD_PWRITE, offset, context, optEvent);
}
bool LinuxOverlappedPoll(AuUInt32 timeout)
{
timespec targetTime;
io_event ioEvents[512];
int temp;
int dwApcsSent {};
auto io = GetTls();
if (!io)
{
return false;
}
if (timeout)
{
AuTime::ms2tsabs(&targetTime, timeout);
}
if (!LinuxOverlappedTrySubmitWork())
{
SysPushErrorIO();
errno = EBADF;
return -1;
}
do
{
temp = io_getevents(io->context, 1, 512, ioEvents, timeout ? &targetTime : nullptr);
if (temp >= 0)
{
for (AU_ITERATE_N(i, temp))
{
auto handle = AuReinterpretCast<ASubmittable *>(ioEvents[i].data);
io->dwIoSubmits--;
auto errNo = 0;
bool bError {};
AuUInt bytesTransacted {};
if (ioEvents[i].res <= 0)
{
errNo = 0 - ioEvents[i].res;
bError = true;
}
else
{
bytesTransacted = ioEvents[i].res;
}
handle->LIOS_SendProcess(bytesTransacted, bError, errNo);
dwApcsSent++;
}
}
else
{
auto err = 0 - temp;
if (err == EINTR)
{
errno = EINTR;
break;
}
if (err == EINVAL)
{
SysPushErrorArg();
break;
}
if (err == EFAULT)
{
SysPanic("Either events or timeout is an invalid pointer.");
}
if (err == ENOSYS)
{
SysPanic("io_getevents() is not implemented on this architecture.");
}
}
} while (timeout);
return dwApcsSent;
}
bool SendIOBuffers()
{
return LinuxOverlappedTrySubmitWork();
}
int LinuxOverlappedEpollShim(int epfd, struct epoll_event *events,
int maxevents, int timeout)
{
timespec targetTime;
auto io = GetTls();
if (!io)
{
return epoll_wait(epfd, events, maxevents, timeout);
}
if (timeout)
{
AuTime::ms2tsabs(&targetTime, timeout);
}
if (!LinuxOverlappedTrySubmitWork())
{
SysPushErrorIO();
errno = EBADF;
return -1;
}
if (!io->dwIoSubmits)
{
return epoll_wait(epfd, events, maxevents, timeout);
}
// evil nested epoll garbage
// but i'm sure the kevent way would be more inefficient
iocb pollSubmit {};
pollSubmit.aio_fildes = epfd;
pollSubmit.aio_lio_opcode = IOCB_CMD_POLL;
pollSubmit.aio_buf = POLLIN;
iocb * ptr = &pollSubmit;
iocb ** ptrArray = &ptr;
if (io_submit(io->context, 1, ptrArray) <= 0)
{
errno = EFAULT;
SysPushErrorIO();
return -1;
}
bool bEpollTriggered {};
io_event ioEvents[512];
int temp;
do
{
temp = io_getevents(io->context, 1, 512, ioEvents, timeout ? &targetTime : nullptr);
if (temp >= 0)
{
for (AU_ITERATE_N(i, temp))
{
auto handle = AuReinterpretCast<ASubmittable *>(ioEvents[i].data);
if (!handle)
{
// loop source has message
bEpollTriggered = true;
}
else
{
io->dwIoSubmits--;
auto errNo = 0;
bool bError {};
AuUInt bytesTransacted {};
if (ioEvents[i].res <= 0)
{
errNo = 0 - ioEvents[i].res;
bError = true;
}
else
{
bytesTransacted = ioEvents[i].res;
}
handle->LIOS_SendProcess(bytesTransacted, bError, errNo);
}
}
}
else
{
auto err = 0 - temp;
if (err == EINTR)
{
errno = EINTR;
break;
}
if (err == EINVAL)
{
SysPushErrorArg();
break;
}
if (err == EFAULT)
{
SysPanic("Either events or timeout is an invalid pointer.");
}
if (err == ENOSYS)
{
SysPanic("io_getevents() is not implemented on this architecture.");
}
}
} while (timeout ? !bEpollTriggered : false);
io_event finalEpollEvent;
if (io_cancel(io->context, ptr, &finalEpollEvent) == 0)
{
// TODO (Reece): if finalEpollEvent.res == 1, bEpollTriggered = true?
}
else
{
// Do I care?
//SysPushErrorIO();
// I don't
}
if (bEpollTriggered)
{
return epoll_wait(epfd, events, maxevents, 0);
}
else
{
return 0;
}
}
bool LinuxOverlappedWaitForOne(AuUInt32 timeout, AuUInt read, AuUInt write, bool &bReadTriggered, bool &bWriteTriggered)
{
timespec targetTime;
auto io = GetTls();
bWriteTriggered = false;
bReadTriggered = false;
if (!io)
{
return false;
}
if (timeout)
{
AuTime::ms2tsabs(&targetTime, timeout);
}
if (!LinuxOverlappedTrySubmitWork())
{
SysPushErrorIO();
errno = EBADF;
return false;
}
if (!io->dwIoSubmits)
{
return false;
}
iocb submit {};
iocb readSubmit {};
iocb writeSubmit {};
submit.aio_lio_opcode = IOCB_CMD_POLL;
iocb * ptrArray[2];
int iocbIdx {};
if (read != -1)
{
readSubmit = submit;
readSubmit.aio_fildes = read;
readSubmit.aio_data = 1;
readSubmit.aio_buf = POLLIN;
ptrArray[iocbIdx++] = &readSubmit;
}
if (write != -1)
{
writeSubmit = submit;
writeSubmit.aio_fildes = write;
writeSubmit.aio_data = 2;
writeSubmit.aio_buf = POLLOUT;
ptrArray[iocbIdx++] = &writeSubmit;
}
if (io_submit(io->context, iocbIdx, ptrArray) <= 0)
{
errno = EFAULT;
SysPushErrorIO();
return -1;
}
io_event ioEvents[512];
int temp;
do
{
temp = io_getevents(io->context, 1, 512, ioEvents, timeout ? &targetTime : nullptr);
if (temp >= 0)
{
for (AU_ITERATE_N(i, temp))
{
if (ioEvents[i].data == 1)
{
bReadTriggered = true;
}
else if (ioEvents[i].data == 2)
{
bWriteTriggered = true;
}
else
{
auto handle = AuReinterpretCast<ASubmittable *>(ioEvents[i].data);
io->dwIoSubmits--;
auto errNo = 0;
bool bError {};
AuUInt bytesTransacted {};
if (ioEvents[i].res <= 0)
{
errNo = 0 - ioEvents[i].res;
bError = true;
}
else
{
bytesTransacted = ioEvents[i].res;
}
handle->LIOS_SendProcess(bytesTransacted, bError, errNo);
}
}
}
else
{
auto err = 0 - temp;
if (err == EINTR)
{
errno = EINTR;
break;
}
if (err == EINVAL)
{
SysPushErrorArg();
break;
}
if (err == EFAULT)
{
SysPanic("Either events or timeout is an invalid pointer.");
}
if (err == ENOSYS)
{
SysPanic("io_getevents() is not implemented on this architecture.");
}
}
} while (timeout ? (!bReadTriggered && !bWriteTriggered) : false);
for (int i = 0; i < iocbIdx; i++)
{
io_event finalEpollEvent;
if (io_cancel(io->context, ptrArray[i], &finalEpollEvent) == 0)
{
// TODO (Reece): if finalEpollEvent.res == 1, bEpollTriggered = true?
}
else
{
// Do I care?
//SysPushErrorIO();
// I don't
}
}
return true;
}
bool LinuxOverlappedWaitForAtleastOne(AuUInt32 timeout, const AuList<AuUInt> &handles, const AuList<AuUInt> &handlesWrite, AuUInt &one, AuUInt &two)
{
// TODO:
return false;
}
}

View File

@ -0,0 +1,58 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOSubmit.Linux.hpp
Date: 2022-4-12
Author: Reece
Note: I want APCs, overlapped, and event based IO on Linux. Gib.
***/
#pragma once
#include <linux/aio_abi.h>
namespace Aurora::IO::UNIX
{
struct ASubmittable
{
virtual void LIOS_Process(AuUInt32 read, bool failure, int err, bool mark) = 0;
void LIOS_SendProcess(AuUInt32 read, bool failure, int err, bool mark = false);
void LIOS_Init(const AuSPtr<void> &pin);
void LIOS_Reset();
bool HasState();
void SetMemory(const AuSPtr<AuMemoryViewRead> &view);
void SetMemory(const AuSPtr<AuMemoryViewWrite> &view);
AuUInt64 GetData();
AuUInt64 GetBuf();
AuUInt GetBufLength();
iocb & GetIOCB();
private:
AuUInt64 dataPtr_ {};
AuUInt dataLen_ {};
AuSPtr<void> pin_;
AuSPtr<void> memPin_;
iocb cb {};
};
// Yield/Wait shims
bool LinuxOverlappedWaitForOne (AuUInt32 timeout, AuUInt read, AuUInt write, bool &bReadTriggered, bool &bWriteTriggered);
bool LinuxOverlappedWaitForAtleastOne(AuUInt32 timeout, const AuList<AuUInt> &handles, const AuList<AuUInt> &handlesWrite, AuUInt &one, AuUInt &two);
bool LinuxOverlappedPoll (AuUInt32 timeout);
int LinuxOverlappedEpollShim(int epfd, struct epoll_event *events,
int maxevents, int timeout);
// Work queue
bool LinuxOverlappedSubmitRead (int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent);
bool LinuxOverlappedSubmitWrite(int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent);
// TODO: Stream copy operations
// Bookkeeping
bool SendIOBuffers();
}

33
Source/IO/UNIX/UnixIO.cpp Normal file
View File

@ -0,0 +1,33 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: UnixIO.cpp
Date: 2022-4-12
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "UnixIO.hpp"
namespace Aurora::IO::UNIX
{
void InitUnixIO()
{
}
}
#if defined(AURORA_IS_LINUX_DERIVED)
#include "IOSubmit.Linux.hpp"
#endif
namespace Aurora::IO
{
AUKN_SYM void SendBatched()
{
#if defined(AURORA_IS_LINUX_DERIVED)
UNIX::SendIOBuffers();
#endif
}
}

13
Source/IO/UNIX/UnixIO.hpp Normal file
View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: UnixIO.hpp
Date: 2022-4-12
Author: Reece
***/
#pragma once
namespace Aurora::IO::UNIX
{
void InitUnixIO();
}

View File

@ -11,6 +11,7 @@
#include "LoopQueue.Linux.hpp"
#include <sys/epoll.h>
#include <Source/Time/Time.hpp>
#include <Source/IO/UNIX/IOSubmit.Linux.hpp>
namespace Aurora::Loop
{
@ -658,7 +659,7 @@ namespace Aurora::Loop
deltaMS = nonblock ? 0 : -1;
}
int iEvents = epoll_wait(this->epollFd_, events, AuArraySize(events), deltaMS);
int iEvents = IO::UNIX::LinuxOverlappedEpollShim(this->epollFd_, events, AuArraySize(events), deltaMS);
if (iEvents == -1)
{
goto out;

View File

@ -9,6 +9,10 @@
#include "WaitSingle.hpp"
#include <Source/Time/Time.hpp>
#if defined(AURORA_IS_LINUX_DERIVED)
#include <Source/IO/UNIX/IOSubmit.Linux.hpp>
#endif
namespace Aurora::Loop
{
bool WaitSingleGeneric::WaitForAtleastOne(AuUInt32 timeout, const AuList<AuUInt> &handles, const AuList<AuUInt> &handlesWrite, AuUInt &one, AuUInt &two)
@ -17,6 +21,8 @@ namespace Aurora::Loop
AuUInt maxHandle {};
struct timeval tv {};
// TODO: IO SUBMIT HOOK
FD_ZERO(&readSet);
FD_ZERO(&writeSet);
@ -81,6 +87,22 @@ namespace Aurora::Loop
struct timeval tv {};
int maxFd {};
#if defined(AURORA_IS_LINUX_DERIVED)
{
bool bWriteTriggered, bReadTriggered;
if (IO::UNIX::LinuxOverlappedWaitForOne(timeout, read, write, bReadTriggered, bWriteTriggered))
{
if (!bWriteTriggered && !bReadTriggered)
{
return false;
}
return true;
}
}
#endif
FD_ZERO(&readSet);
FD_ZERO(&writeSet);